<!DOCTYPE html><html lang="zh-CN" data-theme="light"><head><meta charset="UTF-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width,initial-scale=1"><title>RabbitMq学习笔记 | 十三不知</title><meta name="keywords" content="MQ"><meta name="author" content="张十三"><meta name="copyright" content="张十三"><meta name="format-detection" content="telephone=no"><meta name="theme-color" content="#ffffff"><meta name="description" content="RabbitMq学习笔记 RabbitMQ简介 RabbitMQ 是一个由 Erlang 语言开发的 AMQP协议 的开源实现，用来通过普通协议在完全不同的应用之间完成数据共享。 Erlang语言最初在于交换机领域的架构模式，这样使得 RabbitMQ在Broker之间进行数据交互的性能是非常优秀的 Erlang的优点：Erlang有着和原生Socket—样的延迟 AMQP ： Advanced">
<meta property="og:type" content="article">
<meta property="og:title" content="RabbitMq学习笔记">
<meta property="og:url" content="http://zuori.gitee.io/zaker/2021/03/26/RabbitMq%E5%9F%BA%E7%A1%80%E4%BD%BF%E7%94%A8/index.html">
<meta property="og:site_name" content="十三不知">
<meta property="og:description" content="RabbitMq学习笔记 RabbitMQ简介 RabbitMQ 是一个由 Erlang 语言开发的 AMQP协议 的开源实现，用来通过普通协议在完全不同的应用之间完成数据共享。 Erlang语言最初在于交换机领域的架构模式，这样使得 RabbitMQ在Broker之间进行数据交互的性能是非常优秀的 Erlang的优点：Erlang有着和原生Socket—样的延迟 AMQP ： Advanced">
<meta property="og:locale" content="zh_CN">
<meta property="og:image" content="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg">
<meta property="article:published_time" content="2021-03-26T01:46:36.000Z">
<meta property="article:modified_time" content="2021-03-26T09:09:07.587Z">
<meta property="article:author" content="张十三">
<meta property="article:tag" content="MQ">
<meta name="twitter:card" content="summary">
<meta name="twitter:image" content="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg"><link rel="shortcut icon" href="https://gitee.com/zuori/kaiscript/raw/master/images/20210325203813.png"><link rel="canonical" href="http://zuori.gitee.io/zaker/2021/03/26/RabbitMq%E5%9F%BA%E7%A1%80%E4%BD%BF%E7%94%A8/"><link rel="preconnect" href="//cdn.jsdelivr.net"/><link rel="preconnect" href="//busuanzi.ibruce.info"/><link rel="stylesheet" href="/zaker/css/index.css"><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@fortawesome/fontawesome-free/css/all.min.css" media="print" onload="this.media='all'"><script>const GLOBAL_CONFIG = { 
  root: '/zaker/',
  algolia: undefined,
  localSearch: undefined,
  translate: undefined,
  noticeOutdate: undefined,
  highlight: {"plugin":"highlighjs","highlightCopy":true,"highlightLang":true,"highlightHeightLimit":false},
  copy: {
    success: '复制成功',
    error: '复制错误',
    noSupport: '浏览器不支持'
  },
  relativeDate: {
    homepage: false,
    post: false
  },
  runtime: '天',
  date_suffix: {
    just: '刚刚',
    min: '分钟前',
    hour: '小时前',
    day: '天前',
    month: '个月前'
  },
  copyright: {"limitCount":50,"languages":{"author":"作者: 张十三","link":"链接: ","source":"来源: 十三不知","info":"著作权归作者所有。商业转载请联系作者获得授权，非商业转载请注明出处。"}},
  lightbox: 'fancybox',
  Snackbar: undefined,
  source: {
    jQuery: 'https://cdn.jsdelivr.net/npm/jquery@latest/dist/jquery.min.js',
    justifiedGallery: {
      js: 'https://cdn.jsdelivr.net/npm/justifiedGallery/dist/js/jquery.justifiedGallery.min.js',
      css: 'https://cdn.jsdelivr.net/npm/justifiedGallery/dist/css/justifiedGallery.min.css'
    },
    fancybox: {
      js: 'https://cdn.jsdelivr.net/npm/@fancyapps/fancybox@latest/dist/jquery.fancybox.min.js',
      css: 'https://cdn.jsdelivr.net/npm/@fancyapps/fancybox@latest/dist/jquery.fancybox.min.css'
    }
  },
  isPhotoFigcaption: false,
  islazyload: false,
  isanchor: false
}</script><script id="config-diff">var GLOBAL_CONFIG_SITE = { 
  isPost: true,
  isHome: false,
  isHighlightShrink: false,
  isToc: true,
  postUpdate: '2021-03-26 17:09:07'
}</script><noscript><style type="text/css">
  #nav {
    opacity: 1
  }
  .justified-gallery img {
    opacity: 1
  }

  #recent-posts time,
  #post-meta time {
    display: inline !important
  }
</style></noscript><script>(win=>{
    win.saveToLocal = {
      set: function setWithExpiry(key, value, ttl) {
        if (ttl === 0) return
        const now = new Date()
        const expiryDay = ttl * 86400000
        const item = {
          value: value,
          expiry: now.getTime() + expiryDay,
        }
        localStorage.setItem(key, JSON.stringify(item))
      },

      get: function getWithExpiry(key) {
        const itemStr = localStorage.getItem(key)

        if (!itemStr) {
          return undefined
        }
        const item = JSON.parse(itemStr)
        const now = new Date()

        if (now.getTime() > item.expiry) {
          localStorage.removeItem(key)
          return undefined
        }
        return item.value
      }
    }
  
    win.getScript = url => new Promise((resolve, reject) => {
      const script = document.createElement('script')
      script.src = url
      script.async = true
      script.onerror = reject
      script.onload = script.onreadystatechange = function() {
        const loadState = this.readyState
        if (loadState && loadState !== 'loaded' && loadState !== 'complete') return
        script.onload = script.onreadystatechange = null
        resolve()
      }
      document.head.appendChild(script)
    })
  
      win.activateDarkMode = function () {
        document.documentElement.setAttribute('data-theme', 'dark')
        if (document.querySelector('meta[name="theme-color"]') !== null) {
          document.querySelector('meta[name="theme-color"]').setAttribute('content', '#0d0d0d')
        }
      }
      win.activateLightMode = function () {
        document.documentElement.setAttribute('data-theme', 'light')
        if (document.querySelector('meta[name="theme-color"]') !== null) {
          document.querySelector('meta[name="theme-color"]').setAttribute('content', '#ffffff')
        }
      }
      const t = saveToLocal.get('theme')
    
          if (t === 'dark') activateDarkMode()
          else if (t === 'light') activateLightMode()
        
      const asideStatus = saveToLocal.get('aside-status')
      if (asideStatus !== undefined) {
        if (asideStatus === 'hide') {
          document.documentElement.classList.add('hide-aside')
        } else {
          document.documentElement.classList.remove('hide-aside')
        }
      }
    })(window)</script><meta name="generator" content="Hexo 5.4.0"></head><body><div id="loading-box"><div class="loading-left-bg"></div><div class="loading-right-bg"></div><div class="spinner-box"><div class="configure-border-1"><div class="configure-core"></div></div><div class="configure-border-2"><div class="configure-core"></div></div><div class="loading-word">加载中...</div></div></div><div id="sidebar"><div id="menu-mask"></div><div id="sidebar-menus"><div class="author-avatar"><img class="avatar-img" src="https://gitee.com/zuori/kaiscript/raw/master/images/20210325192506.jpg" onerror="onerror=null;src='/img/friend_404.gif'" alt="avatar"/></div><div class="site-data"><div class="data-item is-center"><div class="data-item-link"><a href="/zaker/archives/"><div class="headline">文章</div><div class="length-num">13</div></a></div></div><div class="data-item is-center"><div class="data-item-link"><a href="/zaker/tags/"><div class="headline">标签</div><div class="length-num">11</div></a></div></div><div class="data-item is-center"><div class="data-item-link"><a href="/zaker/categories/"><div class="headline">分类</div><div class="length-num">4</div></a></div></div></div><hr/><div class="menus_items"><div class="menus_item"><a class="site-page" href="/zaker/"><i class="fa-fw fas fa-home"></i><span> Home</span></a></div><div class="menus_item"><a class="site-page" href="/zaker/archives/"><i class="fa-fw fas fa-archive"></i><span> Archives</span></a></div><div class="menus_item"><a class="site-page" href="/zaker/tags/"><i class="fa-fw fas fa-tags"></i><span> Tags</span></a></div><div class="menus_item"><a class="site-page" href="/zaker/categories/"><i class="fa-fw fas fa-folder-open"></i><span> Categories</span></a></div></div></div></div><div class="post" id="body-wrap"><header class="post-bg" id="page-header" style="background-image: url('https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg')"><nav id="nav"><span id="blog_name"><a id="site-name" href="/zaker/">十三不知</a></span><div id="menus"><div class="menus_items"><div class="menus_item"><a class="site-page" href="/zaker/"><i class="fa-fw fas fa-home"></i><span> Home</span></a></div><div class="menus_item"><a class="site-page" href="/zaker/archives/"><i class="fa-fw fas fa-archive"></i><span> Archives</span></a></div><div class="menus_item"><a class="site-page" href="/zaker/tags/"><i class="fa-fw fas fa-tags"></i><span> Tags</span></a></div><div class="menus_item"><a class="site-page" href="/zaker/categories/"><i class="fa-fw fas fa-folder-open"></i><span> Categories</span></a></div></div><div id="toggle-menu"><a class="site-page"><i class="fas fa-bars fa-fw"></i></a></div></div></nav><div id="post-info"><h1 class="post-title">RabbitMq学习笔记</h1><div id="post-meta"><div class="meta-firstline"><span class="post-meta-date"><i class="far fa-calendar-alt fa-fw post-meta-icon"></i><span class="post-meta-label">发表于</span><time class="post-meta-date-created" datetime="2021-03-26T01:46:36.000Z" title="发表于 2021-03-26 09:46:36">2021-03-26</time><span class="post-meta-separator">|</span><i class="fas fa-history fa-fw post-meta-icon"></i><span class="post-meta-label">更新于</span><time class="post-meta-date-updated" datetime="2021-03-26T09:09:07.587Z" title="更新于 2021-03-26 17:09:07">2021-03-26</time></span><span class="post-meta-categories"><span class="post-meta-separator">|</span><i class="fas fa-inbox fa-fw post-meta-icon"></i><a class="post-meta-categories" href="/zaker/categories/JAVA/">JAVA</a></span></div><div class="meta-secondline"><span class="post-meta-separator">|</span><span class="post-meta-pv-cv" id="" data-flag-title="RabbitMq学习笔记"><i class="far fa-eye fa-fw post-meta-icon"></i><span class="post-meta-label">阅读量:</span><span id="busuanzi_value_page_pv"></span></span></div></div></div></header><main class="layout" id="content-inner"><div id="post"><article class="post-content" id="article-container"><h1>RabbitMq学习笔记</h1>
<h2 id="RabbitMQ简介">RabbitMQ简介</h2>
<p><strong>RabbitMQ</strong> 是一个由 Erlang 语言开发的 AMQP协议 的开源实现，用来通过普通协议在完全不同的应用之间完成数据共享。</p>
<p><strong>Erlang语言</strong>最初在于交换机领域的架构模式，这样使得 RabbitMQ在Broker之间进行数据交互的性能是非常优秀的</p>
<p>Erlang的优点：<strong>Erlang有着和原生Socket—样的延迟</strong></p>
<p><strong>AMQP</strong> ： Advanced Message Queue，高级消息队列协议。它是应用层协议的一个开放标准，为面向消息的中间件设计，基于此协议的客户端与消息中间件可传递消息，并不受产品、开发语言等条件的限制。</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200418173114.png" alt=""></p>
<p>RabbitMQ 最初起源于金融系统，用于在分布式系统中存储转发消息，在易用性、扩展性、高可用性等方面表现不俗。具体特点包括：</p>
<ol>
<li>可靠性（Reliability） ： RabbitMQ 使用一些机制来保证可靠性，如持久化、传输确认、发布<br>
确认。</li>
<li>灵活的路由（Flexible Routing） ： 在消息进入队列之前，通过 Exchange 来路由消息的。对<br>
于典型的路由功能， RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路<br>
由功能，可以将多个 Exchange 绑定在一起，也通过插件机制实现自己的 Exchange 。</li>
<li>消息集群（Clustering） ： 多个 RabbitMQ 服务器可以组成一个集群，形成一个逻辑 Broker ，并RabbitMQ的集群模式比较丰富，提供了表达式的配置，非常简单，常见的集群模式有HA模式、镜像队列模型、主从模式、远程模式等等，最常用的就是镜像队列模式。</li>
<li>高可用（Highly Available Queues） ： 队列可以在集群中的机器上进行镜像， 使得在部分节点出问题的情况下队列仍然可用。</li>
<li>多种协议（Multi-protocol） ： RabbitMQ 支持多种消息队列协议，比如 STOMP、 MQTT等等。</li>
<li>多语言客户端（Many Clients） ： RabbitMQ 几乎支持所有常用语言，比如 Java、 .NET、Ruby 等等，比如在JAVA中与SpringAMQP完美整合，API相当丰富。</li>
<li>管理界面（Management UI） :RabbitMQ 提供了一个易用的用户界面，使得用户可以监控和管理消息 Broker 的许多方面。</li>
<li>跟踪机制（Tracing） :如果消息异常， RabbitMQ 提供了消息跟踪机制，使用者可以找出发生了什么。</li>
<li>插件机制（Plugin System） :RabbitMQ 提供了许多插件，来从多方面进行扩展，也可以编<br>
写自己的插件。</li>
</ol>
<h3 id="为什么要使用RabbitMQ">为什么要使用RabbitMQ</h3>
<p>近两年谈的很多的一个概念<strong>微服务</strong>，在一个大型业务系统架构中，会被拆分成很多小的业务系统，这些业务系统之间如何建立通信呢？大家熟知的 HTTP、RPC 可以实现不同系统、不同语言之间的通信，除了这些往往还会使用消息队列（RabbitMQ、ActiveMQ、Kafafa 等）将这些系统链接起来，达到各系<strong>统间的解耦</strong>。</p>
<h3 id="RabbitMQ-应用场景">RabbitMQ 应用场景</h3>
<p><strong>同步转异步</strong></p>
<p>在项目中对于一些没必要同步处理的，可以借助 MQ 进行异步处理，比如与其他系统之间同步数据，可以使用MQ发布-订阅的方式完成异步的消息同步</p>
<p><strong>应用解耦</strong></p>
<p>例如商城业务场景中，订单系统与库存系统，下单的同步可能也要去减少库存，将原本耦合在一块的逻辑可以通过消息队列进行，订单系统发布消息，库存系统订阅消息，这样的好处是一般库存系统出现问题也不会影响到订单系统。</p>
<p><strong>流量削峰</strong></p>
<p>流量削峰在一些营销活动、秒杀活动场景中应用还是比较广泛的，如果短时间流量过大，可以通过设置阀值丢弃掉一部分消息或者根据服务的承受能力设置处理消息限制，也就是限流。</p>
<h3 id="MQ-的空间与时间解耦">MQ 的空间与时间解耦</h3>
<p>从空间上来看，消息的生产者无需提前知道消费者的存在，反之消费者亦是，两者之间得到了解耦，不会强依赖，从而实现<strong>空间上的解耦</strong>。</p>
<p>从时间上来看，消息的生产者只负责生产数据将数据放入队列，之后无需关心消费者什么时间去消费，消费则可以根据自己的业务需要来选择实时消费还是延迟消费，两者都拥有了自己的生命周期，从而实现了<strong>时间上的解耦</strong>。</p>
<h3 id="主流消息中间件一览">主流消息中间件一览</h3>
<ul>
<li><strong>ActiveMQ</strong>：Apache 出品，早起很流行主要应用于中小企业，面对大量并发场景会有阻塞、消息堆积问题。</li>
<li><strong>Kafka</strong>：是由 Apache 软件基金会开发的一个开源流处理平台，由 Scala 和 Java 编写，是一种高吞吐量的分布式发布订阅消息系统，支持单机每秒百万并发。最开始目的主要用于大数据方向日志收集、传输。0.8 版本开始支持复制，不支持事物，因此对消息的重复、丢失、错误没有严格的要求。</li>
<li><strong>RocketMQ</strong>：阿里开源的消息中间件，是一款低延迟、高可靠、可伸缩、易于使用的消息中间件，思路起源于 Kafka。最大的问题商业版收费，有些功能不开放。</li>
<li><strong>RabbitMQ</strong>：是一个由 erlang（有着和原生 Socket 一样低的延迟）语言开发基于 AMQP 协议的开源消息队列系统。能保证消息的可靠性、稳定性、安全性。</li>
</ul>
<h2 id="RabbitMQ架构">RabbitMQ架构</h2>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200418171347.png" alt=""></p>
<p>大概意思就是Rabbit MQ拿到消息之后，会先交给 交换机 （Exchange）, 然后交换机再根据预先设定的不同绑定( Bindings )策略，来确定要发给哪个队列。</p>
<h2 id="RabbitMQ的安装与使用">RabbitMQ的安装与使用</h2>
<h3 id="RPM方式安装RabbitMQ">RPM方式安装RabbitMQ</h3>
<blockquote>
<p>安装与学习参考地址：[RabbitMQ官网]:(<a target="_blank" rel="noopener" href="https://www.rabbitmq.com/">https://www.rabbitmq.com/</a>)</p>
</blockquote>
<p>安装步骤：</p>
<ol>
<li>准备服务器环境，我这里用的虚拟机，常见的Centos7版本</li>
<li>下载RabbitMQ必须的安装包</li>
<li>进行安装，修改相关的配置文件</li>
</ol>
<h4 id="准备服务器环境">准备服务器环境</h4>
<p>安装好新的linux之后，系统初始化</p>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br></pre></td><td class="code"><pre><span class="line"><span class="meta">#</span><span class="bash">!/bin/bash</span></span><br><span class="line"><span class="meta">#</span><span class="bash">判断是否为root用户</span></span><br><span class="line">if [ `whoami` != &quot;root&quot; ];then</span><br><span class="line">echo &quot; only root can run it&quot;</span><br><span class="line">exit 1</span><br><span class="line">fi</span><br><span class="line"><span class="meta">#</span><span class="bash">执行前提示</span></span><br><span class="line">echo -e &quot;\033[31m 这是centos7系统初始化脚本，将更新系统内核至最新版本，请慎重运行！\033[0m&quot; </span><br><span class="line">read -s -n1 -p &quot;Press any key to continue or ctrl+C to cancel&quot;</span><br><span class="line">echo &quot;Your inputs: $REPLY&quot;</span><br><span class="line"><span class="meta">#</span><span class="bash">1.定义配置yum源的函数</span></span><br><span class="line">yum_config()&#123;</span><br><span class="line">yum -y install wget</span><br><span class="line">mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup</span><br><span class="line">wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo</span><br><span class="line">yum clean all &amp;&amp; yum makecache</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">2.定义配置NTP的函数</span></span><br><span class="line">ntp_config()&#123;</span><br><span class="line">yum -y install chrony</span><br><span class="line">systemctl start chronyd &amp;&amp; systemctl enable chronyd</span><br><span class="line">timedatectl set-timezone Asia/Shanghai &amp;&amp; timedatectl set-ntp yes</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">3.定义关闭防火墙的函数</span></span><br><span class="line">close_firewalld()&#123;</span><br><span class="line">systemctl stop firewalld.service &amp;&gt; /dev/null</span><br><span class="line">systemctl disable firewalld.service &amp;&gt; /dev/null</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">4.定义关闭selinux的函数</span></span><br><span class="line">close_selinux()&#123;</span><br><span class="line">setenforce 0</span><br><span class="line">sed -i &#x27;s/enforcing/disabled/g&#x27; /etc/selinux/config</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">5.定义安装常用工具的函数</span></span><br><span class="line">yum_tools()&#123;</span><br><span class="line">yum -y install  vim wget curl curl-devel bash-completion lsof iotop iostat unzip bzip2 bzip2-devel</span><br><span class="line">yum -y install  gcc gcc-c++ make cmake autoconf openssl-devel openssl-perl net-tools</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">6.定义升级最新内核的函数</span></span><br><span class="line">update_kernel ()&#123;</span><br><span class="line">rpm --import https://www.elrepo.org/RPM-GPG-KEY-elrepo.org</span><br><span class="line">rpm -Uvh http://www.elrepo.org/elrepo-release-7.0-3.el7.elrepo.noarch.rpm</span><br><span class="line">yum --enablerepo=elrepo-kernel install -y kernel-ml</span><br><span class="line">grub2-set-default 0</span><br><span class="line">grub2-mkconfig -o /boot/grub2/grub.cfg</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">执行脚本</span></span><br><span class="line">main()&#123;</span><br><span class="line">    yum_config;</span><br><span class="line">    ntp_config;</span><br><span class="line">    close_firewalld;</span><br><span class="line">    close_selinux;</span><br><span class="line">    yum_tools;</span><br><span class="line">    update_kernel;</span><br><span class="line">&#125;</span><br><span class="line">main</span><br></pre></td></tr></table></figure>
<p>安装rabbitmq所需的依赖</p>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz lsof</span><br></pre></td></tr></table></figure>
<h4 id="获取安装包">获取安装包</h4>
<p>rabbitmq和erlang安装包一定要对应，具体可以查看对应关系，官网有说明RabbitMQ Erlang Version Requirements</p>
<ul>
<li><strong>获取erlang安装包</strong></li>
</ul>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">sudo wget http://www.rabbitmq.com/releases/erlang/erlang-18.3-1.el6.x86_64.rpm</span><br></pre></td></tr></table></figure>
<ul>
<li><strong>获取socat安装包</strong></li>
</ul>
<p>socat支持多协议，用于协议处理、端口转发，rabbitmq依赖于此。</p>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">sudo wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm</span><br></pre></td></tr></table></figure>
<ul>
<li><strong>获取rabbitmq-server安装包</strong> rabbitmq-server <code>安装包列表</code></li>
</ul>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">sudo wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm</span><br></pre></td></tr></table></figure>
<h4 id="开始安装">开始安装</h4>
<ul>
<li><strong>Centos rpm 一键安装</strong></li>
</ul>
<p>这里采用rpm一键安装，centos 执行命令</p>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br></pre></td><td class="code"><pre><span class="line">rpm -ivh erlang-18.3-1.el6.x86_64.rpm</span><br><span class="line">rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm</span><br><span class="line">rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm</span><br></pre></td></tr></table></figure>
<p><strong>修改配置文件</strong></p>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br></pre></td><td class="code"><pre><span class="line">vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app</span><br><span class="line"></span><br><span class="line">&#123;loopback_users, [&lt;&lt;&quot;guest&quot;&gt;&gt;]&#125;, // 修改为 &#123;loopback_users, [guest]&#125;,</span><br></pre></td></tr></table></figure>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200510175145.png" alt=""></p>
<h4 id="运行与启动">运行与启动</h4>
<ul>
<li><strong>开启 rabbitmq</strong></li>
</ul>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">rabbitmqctl start_app</span><br></pre></td></tr></table></figure>
<p>在这里我启动的时候发生了一个错误：</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">Error: unable to connect to node rabbit@localhost: nodedown</span><br></pre></td></tr></table></figure>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200511214557.png" alt=""></p>
<p>解决步骤：</p>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br></pre></td><td class="code"><pre><span class="line"><span class="meta">#</span><span class="bash">杀死 MQ的进程</span></span><br><span class="line">ps -ef | grep rabbitmq | grep -v grep | awk &#x27;&#123;print $2&#125;&#x27; | xargs kill -9</span><br><span class="line"><span class="meta">#</span><span class="bash">重启MQ服务</span></span><br><span class="line">rabbitmq-server -detached</span><br><span class="line"><span class="meta">#</span><span class="bash">然后再启动rabbitMq应用</span></span><br><span class="line">rabbitmqctl start_app</span><br><span class="line"><span class="meta">#</span><span class="bash">查看rabbitmq状态</span></span><br><span class="line">rabbitmqctl status</span><br></pre></td></tr></table></figure>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200511215056.png" alt=""></p>
<p>已经启动成功！</p>
<ul>
<li><strong>开启管理插件</strong></li>
</ul>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">rabbitmq-plugins enable rabbitmq_management</span><br></pre></td></tr></table></figure>
<ul>
<li><strong>检查状态</strong></li>
</ul>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br></pre></td><td class="code"><pre><span class="line">$ lsof  -i:5672</span><br></pre></td></tr></table></figure>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200510175827.png" alt=""></p>
<p><strong>开启管理通知台</strong> 终端更多操作命令，以下有说明，浏览区输入 <code>http://host:15672</code> 打开管理控制台</p>
<ul>
<li>
<p><strong>几个端口区别说明</strong></p>
</li>
<li>
<ul>
<li><code>5672</code>：通信默认端口号</li>
<li><code>15672</code>：管理控制台默认端口号</li>
<li><code>25672</code>：集群通信端口号</li>
</ul>
</li>
<li>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200511005636.png" alt=""></p>
</li>
</ul>
<p>输入guest /  guest进入管理后台</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200510180836.png" alt=""></p>
<p>在这个界面里面我们可以做些什么？<br>
可以手动创建虚拟host，创建用户，分配权限，创建交换机，创建队列等等，还有查看队列消息，消费效率，推送效率等等</p>
<h4 id="管理界面常见操作">管理界面常见操作</h4>
<h5 id="配置Virtual-hosts"><strong>配置Virtual hosts</strong></h5>
<p>登入管理界面，Admin窗口，就可以创建Virtual host，</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200516145242.png" alt=""></p>
<p>可以看到，如果没有创建虚拟机的话原本默认时会有一个名为 / 的虚拟机，绑定的是guest用户，刚创建的Virtual host都是没有绑定用户的，接下来创建一个测试用户进行绑定</p>
<h5 id="创建用户"><strong>创建用户</strong></h5>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200516150440.png" alt=""></p>
<p>可以看到在创建用户的时候，最后那里有一步设置tags，就是设置用户的角色，不同的角色有不同的权限。</p>
<p>RabbitMQ的用户角色分类：</p>
<p>none、management、policymaker、monitoring、administrator</p>
<ul>
<li>none ：啥管理权限都没有</li>
<li>management:  用户可以访问管理插件</li>
<li>policymaker :  用户可以访问管理插件，并管理他们有权访问的vhost的策略和参数</li>
<li>monitoring :   用户可以访问管理插件，查看所有连接和通道以及与节点相关的信息。</li>
<li>administrator :  用户可以做任何监视可以做的事情，管理用户，vhost和权限，关闭其他用户的连接，并管理所有vhost的政策和参数。</li>
</ul>
<p>创建完用户之后，可以看到用户也没没有绑定虚拟机，可以点进用户的属性界面，选择刚才创建的虚拟机，进行虚拟机的绑定以及权限设置</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200516151420.png" alt=""></p>
<p>大概的操作就先到这里，后面写实例代码的时候有创建quene和exchange的步骤。</p>
<h4 id="操作命令">操作命令</h4>
<p>以下列举一些在终端常用的操作命令</p>
<ul>
<li>whereis rabbitmq：查看 rabbitmq 安装位置</li>
<li>rabbitmqctl start_app：启动应用</li>
<li>whereis erlang：查看erlang安装位置</li>
<li>rabbitmqctl start_app：启动应用</li>
<li>rabbitmqctl stop_app：关闭应用</li>
<li>rabbitmqctl status：节点状态</li>
<li>rabbitmqctl add_user username password：添加用户</li>
<li>rabbitmqctl list_users：列出所有用户</li>
<li>rabbitmqctl delete_user username：删除用户</li>
<li>rabbitmqctl add_vhost vhostpath：创建虚拟主机</li>
<li>rabbitmqctl list_vhosts：列出所有虚拟主机</li>
<li>rabbitmqctl list_queues：查看所有队列</li>
<li>rabbitmqctl -p vhostpath purge_queue blue：清除队列里消息</li>
</ul>
<h3 id="Docker安装RabbitMQ">Docker安装RabbitMQ</h3>
<p>使用docker安装rabbitmq不用那么复杂，直接修改下上面的虚拟机初始化脚本，加两个方法，分别是安装docker和安装rabbitmq的方法：</p>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br><span class="line">72</span><br><span class="line">73</span><br><span class="line">74</span><br><span class="line">75</span><br><span class="line">76</span><br><span class="line">77</span><br><span class="line">78</span><br><span class="line">79</span><br><span class="line">80</span><br><span class="line">81</span><br><span class="line">82</span><br><span class="line">83</span><br><span class="line">84</span><br><span class="line">85</span><br></pre></td><td class="code"><pre><span class="line"><span class="meta">#</span><span class="bash">!/bin/bash</span></span><br><span class="line"><span class="meta">#</span><span class="bash">判断是否为root用户</span></span><br><span class="line">if [ `whoami` != &quot;root&quot; ];then</span><br><span class="line">echo &quot; only root can run it&quot;</span><br><span class="line">exit 1</span><br><span class="line">fi</span><br><span class="line"><span class="meta">#</span><span class="bash">执行前提示</span></span><br><span class="line">echo -e &quot;\033[31m 这是centos7系统初始化脚本，将更新系统内核至最新版本，请慎重运行！\033[0m&quot; </span><br><span class="line">read -s -n1 -p &quot;Press any key to continue or ctrl+C to cancel&quot;</span><br><span class="line">echo &quot;Your inputs: $REPLY&quot;</span><br><span class="line"><span class="meta">#</span><span class="bash">1.定义配置yum源的函数</span></span><br><span class="line">yum_config()&#123;</span><br><span class="line">yum -y install wget</span><br><span class="line">mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup</span><br><span class="line">wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo</span><br><span class="line">yum clean all &amp;&amp; yum makecache</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">2.定义配置NTP的函数</span></span><br><span class="line">ntp_config()&#123;</span><br><span class="line">yum -y install chrony</span><br><span class="line">systemctl start chronyd &amp;&amp; systemctl enable chronyd</span><br><span class="line">timedatectl set-timezone Asia/Shanghai &amp;&amp; timedatectl set-ntp yes</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">3.定义关闭防火墙的函数</span></span><br><span class="line">close_firewalld()&#123;</span><br><span class="line">systemctl stop firewalld.service &amp;&gt; /dev/null</span><br><span class="line">systemctl disable firewalld.service &amp;&gt; /dev/null</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">4.定义关闭selinux的函数</span></span><br><span class="line">close_selinux()&#123;</span><br><span class="line">setenforce 0</span><br><span class="line">sed -i &#x27;s/enforcing/disabled/g&#x27; /etc/selinux/config</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">5.定义安装常用工具的函数</span></span><br><span class="line">yum_tools()&#123;</span><br><span class="line">yum -y install  vim wget curl curl-devel bash-completion lsof iotop iostat unzip bzip2 bzip2-devel</span><br><span class="line">yum -y install  gcc gcc-c++ make cmake autoconf openssl-devel openssl-perl net-tools</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">6.定义升级最新内核的函数</span></span><br><span class="line">update_kernel ()&#123;</span><br><span class="line">rpm --import https://www.elrepo.org/RPM-GPG-KEY-elrepo.org</span><br><span class="line">rpm -Uvh http://www.elrepo.org/elrepo-release-7.0-3.el7.elrepo.noarch.rpm</span><br><span class="line">yum --enablerepo=elrepo-kernel install -y kernel-ml</span><br><span class="line">grub2-set-default 0</span><br><span class="line">grub2-mkconfig -o /boot/grub2/grub.cfg</span><br><span class="line">&#125;</span><br><span class="line"></span><br><span class="line"><span class="meta">#</span><span class="bash">7. 安装docker</span></span><br><span class="line">install_docker()&#123;</span><br><span class="line">yum -y install  yum-utils device-mapper-persistent-data</span><br><span class="line">yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo</span><br><span class="line">yum -y install docker-ce</span><br><span class="line"></span><br><span class="line">if [ ! -f &quot;/etc/docker/daemon.json&quot; ]; then</span><br><span class="line">  echo &#x27;docker配置文件不存在，创建并设置加速镜像&#x27;</span><br><span class="line">  touch /etc/docker/daemon.json</span><br><span class="line">  echo &quot;&#123;\&quot;registry-mirrors\&quot;:[ \&quot;https://reg-mirror.qiniu.com\&quot;,\&quot;http://hub-mirror.c.163.com\&quot; ]&#125;&quot; &gt; /etc/docker/daemon.json</span><br><span class="line">  echo &#x27;镜像加速设置完毕&#x27;</span><br><span class="line">fi</span><br><span class="line"></span><br><span class="line">systemctl start docker</span><br><span class="line">systemctl enable docker</span><br><span class="line">echo &#x27;docker安装完毕！&#x27;</span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">8. docker安装rabbitmq</span></span><br><span class="line">docker_install_rabbit()&#123;</span><br><span class="line"><span class="meta">#</span><span class="bash">指定版本，该版本包含了web控制页面</span></span><br><span class="line">docker pull rabbitmq:management</span><br><span class="line"><span class="meta">#</span><span class="bash">方式一：默认guest 用户，密码也是 guest</span></span><br><span class="line">docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management</span><br><span class="line"><span class="meta">#</span><span class="bash">方式二：设置用户名和密码</span></span><br><span class="line"><span class="meta">#</span><span class="bash">docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management</span></span><br><span class="line">&#125;</span><br><span class="line"><span class="meta">#</span><span class="bash">执行脚本</span></span><br><span class="line">main()&#123;</span><br><span class="line">    yum_config;</span><br><span class="line">    ntp_config;</span><br><span class="line">    close_firewalld;</span><br><span class="line">    close_selinux;</span><br><span class="line">    yum_tools;</span><br><span class="line">    update_kernel;</span><br><span class="line">	install_docker;</span><br><span class="line">	docker_install_rabbit;</span><br><span class="line">&#125;</span><br><span class="line">main</span><br></pre></td></tr></table></figure>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br></pre></td><td class="code"><pre><span class="line">&#123;</span><br><span class="line">  &quot;registry-mirrors&quot;: [</span><br><span class="line">    &quot;https:&#x2F;&#x2F;registry.docker-cn.com&quot;,</span><br><span class="line">    &quot;http:&#x2F;&#x2F;hub-mirror.c.163.com&quot;,</span><br><span class="line">    &quot;https:&#x2F;&#x2F;docker.mirrors.ustc.edu.cn&quot;</span><br><span class="line">  ]</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<p>如果是已有的环境，执行以下几个命令即可：</p>
<figure class="highlight shell"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br></pre></td><td class="code"><pre><span class="line"><span class="meta">#</span><span class="bash">指定版本，该版本包含了web控制页面</span></span><br><span class="line">docker pull rabbitmq:management</span><br><span class="line"><span class="meta">#</span><span class="bash">方式一：默认guest 用户，密码也是 guest</span></span><br><span class="line">docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management</span><br><span class="line"><span class="meta">#</span><span class="bash">方式二：设置用户名和密码</span></span><br><span class="line">docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management</span><br></pre></td></tr></table></figure>
<h2 id="RabbitMQ的核心概念">RabbitMQ的核心概念</h2>
<h3 id="Broker"><strong>Broker</strong></h3>
<p>其实就是server，接受客户端的连接，表示消息队列服务器实体</p>
<h3 id="Connection"><strong>Connection</strong></h3>
<p>和具体的Broker的网络连接。</p>
<h3 id="Message">Message</h3>
<p>消息，消息是不具名的，它由消息头和消息体组成。消息体是不透明的，而消息头则由一系列的可选属性组成，这些属性包括 routing-key（路由键）、 priority（相对于其他消息的优先权）、 delivery-mode（指出该消息可能需要持久性存储）等。</p>
<h3 id="Channel"><strong>Channel</strong></h3>
<p>信道， 多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内地虚拟连接， AMQP 命令都是通过信道发出去的，不管是发布消息、订阅队列还是接收消息，这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销，所以引入了信道的概念，以复用一条 TCP 连接。客户端可以建立多个channel，每个channel表示一个会话任务。</p>
<h3 id="Publisher"><strong>Publisher</strong></h3>
<p>消息的生产者，也是一个向交换器发布消息的客户端应用程序。</p>
<h3 id="Exchange（将消息路由给队列-）"><strong>Exchange（将消息路由给队列 ）</strong></h3>
<p>交换器，用来接收生产者发送的消息并将这些消息路由给服务器中的队列。</p>
<h3 id="Binding（消息队列和交换器之间的关联）"><strong>Binding（消息队列和交换器之间的关联）</strong></h3>
<p>绑定，用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则，虚拟机根据他来确定如何路由 一条消息，所以可以将交换器理解成一个由绑定构成的路由表。</p>
<h3 id="Queue"><strong>Queue</strong></h3>
<p>消息队列，用来保存消息直到发送给消费者。它是消息的容器，也是消息的终点。 一个消息可投入一个或多个队列。消息一直在队列里面，等待消费者连接到这个队列将其取走 。</p>
<h3 id="Consumer"><strong>Consumer</strong></h3>
<p>消息的消费者，表示一个从消息队列中取得消息的客户端应用程序。</p>
<h3 id="Virtual-Host"><strong>Virtual Host</strong></h3>
<p>虚拟主机，表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。</p>
<h2 id="交换机的模式">交换机的模式</h2>
<p>交换机可以理解成具有路由表的路由程序，仅此而已。每个消息都有一个称为路由键（routing key）的属性，就是一个简单的字符串。RabbitMQ提供了四种Exchange模式：<strong>fanout</strong>,<strong>direct</strong>,<strong>topic</strong>,<strong>header</strong> 。 header模式在实际使用中较少，这里只说一下前三种模式.</p>
<h3 id="fanout">fanout</h3>
<p>fanout 模式就是<strong>广播模式</strong>，这种模式就是所有的消息来了会转发到所有绑定此交换机的队列上，这种模式转发消息是最快的。</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200516115033.png" alt=""></p>
<h3 id="Direct"><strong>Direct</strong></h3>
<p>Direct这个单词本意就是直接的意思，顾名思义，Direct 模式就是指定队列模式， 消息来了，只发给指定的 Queue, 其他Queue 都收不到，这里是将一个队列绑定到交换机上，要求该消息与一个特定的路由键完全匹配，一对一的匹配才会转发</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200516115249.png" alt=""></p>
<h3 id="Topic">Topic</h3>
<p>topic模式，又称主题模式，将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“ # ”匹配一个或多个词，符号“ * ”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.xyz”，但是 “ abc. * ”  只会匹配到“abc.def”,一定是规则匹配才会转发。</p>
<p>如图所示消息来源有： 美国新闻，美国天气，欧洲新闻，欧洲天气。<br>
如果你想看 <strong>美国</strong>主题： 那么就会收到 <strong>美国</strong>新闻，<strong>美国</strong>天气。<br>
如果你想看 <strong>新闻</strong>主题： 那么就会收到 美国<strong>新闻</strong>，欧洲<strong>新闻</strong>。<br>
如果你想看 <strong>天气</strong>主题： 那么就会收到 美国<strong>天气</strong>，欧洲<strong>天气</strong>。<br>
如果你想看 <strong>欧洲</strong>主题： 那么就会收到 <strong>欧洲</strong>新闻，<strong>欧洲</strong>天气。</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200516120515.png" alt=""></p>
<p><strong><u>一定记住，不管什么模式，所有的消息始终都是先发送到交换机，由交换机经过路由传送给队列，消费者再从队列中获取消息的！</u></strong></p>
<p><strong>所以说，生产者是面向交换机的，消费者是面向队列的</strong></p>
<h2 id="构建一个简单的生产者与消费者模型">构建一个简单的生产者与消费者模型</h2>
<p>生产者-消费者模型是指一方生产数据一方消费数据。两者之间会有一个缓冲区做为中介，生产者把数据放入缓冲区，消费者从缓冲区取出数据。另外，生产者消费者模式也是是面向过程编程其中的一种设计模式。</p>
<h3 id="构建生产者与消费者步骤">构建生产者与消费者步骤</h3>
<p>以下列举一下生产者与消费者模型在实现时的一些步骤，各语言在实现的过程中也都是大同小异的。</p>
<p><strong>生产者步骤</strong></p>
<ul>
<li>创建链接工厂</li>
<li>通过链接工厂创建链接</li>
<li>通过链接创建通道（channel）</li>
<li>通过 channel 发送数据</li>
<li>关闭链接</li>
</ul>
<p><strong>消费者步骤</strong></p>
<ul>
<li>创建链接工厂</li>
<li>通过链接工厂创建链接</li>
<li>通过链接创建通道（channel）</li>
<li>声明一个队列</li>
<li>创建消费者</li>
<li>设置 channel</li>
</ul>
<h3 id="生产者实例">生产者实例</h3>
<p>在开始写代码之前，可以先去创建一个可供消息存放的队列，其实也可以用代码去创建，这里界面操作可以熟悉并理解交换机，队列，路由之间的关系。</p>
<p><strong>创建新的queue</strong></p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200510233643.png" alt="创建消息队列"></p>
<p>输入要创建的队列名称。选择消息持久化到硬盘。</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">package</span> com.zkk.rabbitmq.producer.test;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.Channel;</span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.Connection;</span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.ConnectionFactory;</span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * description</span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span> Zaker 2020/05/10 23:50</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">Producer</span> </span>&#123;</span><br><span class="line">	<span class="function"><span class="keyword">public</span> <span class="keyword">static</span> <span class="keyword">void</span> <span class="title">main</span><span class="params">(String[] args)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">		<span class="comment">// 1. 创建连接池实例</span></span><br><span class="line">		ConnectionFactory connectionFactory=<span class="keyword">new</span> ConnectionFactory();</span><br><span class="line">		connectionFactory.setHost(<span class="string">&quot;192.168.56.2&quot;</span>);</span><br><span class="line">		connectionFactory.setPort(<span class="number">5672</span>);</span><br><span class="line">		connectionFactory.setVirtualHost(<span class="string">&quot;/&quot;</span>);</span><br><span class="line">		<span class="comment">// 2. 通过连接池获取一个连接实例</span></span><br><span class="line">		Connection connection=connectionFactory.newConnection();</span><br><span class="line">		<span class="comment">//3. 通过连接获取连接通道</span></span><br><span class="line">		Channel channel=connection.createChannel();</span><br><span class="line">		<span class="comment">// 4. 通过 channel 发送数据</span></span><br><span class="line">		<span class="comment">/**</span></span><br><span class="line"><span class="comment">		 * 这里basicPublish（）方法的参数可以说明一下</span></span><br><span class="line"><span class="comment">		 * 	(1)String exchange -- 交换机名称，如果不传默认为 AMQP default</span></span><br><span class="line"><span class="comment">		 *  (2) String routingKey -- 路由关键字 ,如果没有交换机直接输入队列名称</span></span><br><span class="line"><span class="comment">		 *  (3)BasicProperties props -- 消息的基本属性，例如路由头等</span></span><br><span class="line"><span class="comment">		 *  (4)byte[] body -- 消息体</span></span><br><span class="line"><span class="comment">		 */</span></span><br><span class="line">		channel.basicPublish(<span class="string">&quot;&quot;</span>, <span class="string">&quot;hello-queue&quot;</span>, <span class="keyword">null</span>, <span class="string">&quot;hello world&quot;</span>.getBytes());</span><br><span class="line">		<span class="comment">// 5. 关闭连接</span></span><br><span class="line">		channel.close();</span><br><span class="line">		connection.close();</span><br><span class="line">	&#125;</span><br><span class="line">&#125;</span><br><span class="line"></span><br></pre></td></tr></table></figure>
<p>执行一下生产者的方法，执行成功后查看管理界面，看queue</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200510235155.png" alt=""></p>
<p>可以很清晰的看到队列中有一条未被处理的消息。说明生产者以及将消息推送到队列中，等待消费者进行消费，然后可以点击队列名称，进入队列详情中拉到下面选项中有一个GetMessage的选项，点击Get Message，这里可以看到消息的具体内容，但不会将消息进行消费。</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200510235444.png" alt=""></p>
<p>然后下面看一下队列与交换机的绑定形式，首先，创建一个交换机</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200511000145.png" alt=""></p>
<p>交换机创建好之后，点击交换机名称，将交换机和队列进行绑定，在这里注意指定Routing Key的时候要与代码中指定的 Routing Key要一致。<strong>在生产端，消息的发送是面向交换机的，先发送到交换机，交换机通过Routing Key去匹配对应的队列，如果匹配到了，才会把消息最终放入对应的消息队列中去</strong></p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200511000330.png" alt=""></p>
<p>其中Routing Key的写法可以使用模糊匹配的格式</p>
<p>这里写的**hello.#<strong>的意思就是所有以</strong>hello.<strong>开头的路由都会被匹配到,其实还有</strong>hell.<strong>的写法，只不过*只能匹配到后续的一个  <strong>.</strong>  的模糊匹配，如果是hello.ab 就可以匹配到，但是如果是hello.ab.cd就匹配不到了，但是</strong>hello.#**可以模糊匹配多组。</p>
<p>接下来可以修改一下最开始代码，填充上交换机的参数进行测试：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">package</span> com.zkk.rabbitmq.producer.test;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.Channel;</span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.Connection;</span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.ConnectionFactory;</span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * description</span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span> Zaker 2020/05/10 22:02</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">Producer</span> </span>&#123;</span><br><span class="line">	<span class="function"><span class="keyword">public</span> <span class="keyword">static</span> <span class="keyword">void</span> <span class="title">main</span><span class="params">(String[] args)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">		<span class="comment">// 1. 创建连接池实例</span></span><br><span class="line">		ConnectionFactory connectionFactory=<span class="keyword">new</span> ConnectionFactory();</span><br><span class="line">		connectionFactory.setHost(<span class="string">&quot;192.168.56.2&quot;</span>);</span><br><span class="line">		connectionFactory.setPort(<span class="number">5672</span>);</span><br><span class="line">		connectionFactory.setVirtualHost(<span class="string">&quot;/&quot;</span>);</span><br><span class="line">		<span class="comment">// 2. 通过连接池获取一个连接实例</span></span><br><span class="line">		Connection connection=connectionFactory.newConnection();</span><br><span class="line">		<span class="comment">//3. 通过连接获取连接通道</span></span><br><span class="line">		Channel channel=connection.createChannel();</span><br><span class="line">		<span class="comment">// 4. 通过 channel 发送数据</span></span><br><span class="line">		<span class="comment">/**</span></span><br><span class="line"><span class="comment">		 * 这里basicPublish（）方法的参数可以说明一下</span></span><br><span class="line"><span class="comment">		 * 	(1)String exchange -- 交换机名称，如果不传默认为 AMQP default</span></span><br><span class="line"><span class="comment">		 *  (2) String routingKey -- 路由关键字 ,如果没有交换机直接输入队列名称</span></span><br><span class="line"><span class="comment">		 *  (3)BasicProperties props -- 消息的基本属性，例如路由头等</span></span><br><span class="line"><span class="comment">		 *  (4)byte[] body -- 消息体</span></span><br><span class="line"><span class="comment">		 */</span></span><br><span class="line">		channel.basicPublish(<span class="string">&quot;hello-exchange&quot;</span>, <span class="string">&quot;hello.ab&quot;</span>, <span class="keyword">null</span>, <span class="string">&quot;hello RabbitMQ&quot;</span>.getBytes());</span><br><span class="line">		<span class="comment">// 5. 关闭连接</span></span><br><span class="line">		channel.close();</span><br><span class="line">		connection.close();</span><br><span class="line">	&#125;</span><br><span class="line">&#125;</span><br><span class="line"></span><br></pre></td></tr></table></figure>
<p>执行生产端方法后查看queue，可以看到本条消息也已经推送到队列中了：</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200511001555.png" alt=""></p>
<p>关于RabbitMQ的生产者大概就是这样，下来看一下消费者的使用</p>
<h3 id="消费者实例">消费者实例</h3>
<p>通过上面步骤哪里可以看到，消费者和生产者前几步其实都是差不多的，因为消息队列等在生产者的时候都已经设置好了，所以可以直接写代码：</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">package</span> com.zkk.rabbitmq.producer.test;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.*;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> java.io.IOException;</span><br><span class="line"><span class="keyword">import</span> java.io.UnsupportedEncodingException;</span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * description</span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span> Zaker 2020/05/11 0:40</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">Consumer</span> </span>&#123;</span><br><span class="line">	<span class="function"><span class="keyword">public</span> <span class="keyword">static</span> <span class="keyword">void</span> <span class="title">main</span><span class="params">(String[] args)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">		<span class="comment">// 1. 创建连接池实例</span></span><br><span class="line">		ConnectionFactory connectionFactory=<span class="keyword">new</span> ConnectionFactory();</span><br><span class="line">		connectionFactory.setHost(<span class="string">&quot;192.168.56.2&quot;</span>);</span><br><span class="line">		connectionFactory.setPort(<span class="number">5672</span>);</span><br><span class="line">		connectionFactory.setVirtualHost(<span class="string">&quot;/&quot;</span>);</span><br><span class="line">		<span class="comment">// 2. 通过连接池获取一个连接实例</span></span><br><span class="line">		Connection connection=connectionFactory.newConnection();</span><br><span class="line">		<span class="comment">//3. 通过连接获取连接通道</span></span><br><span class="line">		Channel channel=connection.createChannel();</span><br><span class="line">		<span class="comment">//4. 声明一个队列</span></span><br><span class="line">		String queueName=<span class="string">&quot;hello-queue&quot;</span>;</span><br><span class="line">		channel.queueDeclare(queueName, <span class="keyword">true</span>, <span class="keyword">false</span>, <span class="keyword">false</span>, <span class="keyword">null</span>);</span><br><span class="line">		<span class="comment">//5. 创建消费者</span></span><br><span class="line">		DefaultConsumer consumer=<span class="keyword">new</span> DefaultConsumer(channel)&#123;</span><br><span class="line">			<span class="meta">@Override</span></span><br><span class="line">			<span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">handleDelivery</span><span class="params">(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,</span></span></span><br><span class="line"><span class="function"><span class="params">									   <span class="keyword">byte</span>[] body)</span> <span class="keyword">throws</span> IOException </span>&#123;</span><br><span class="line">				<span class="keyword">super</span>.handleDelivery(consumerTag, envelope, properties, body);</span><br><span class="line"></span><br><span class="line">				String message = <span class="keyword">null</span>;</span><br><span class="line">				<span class="keyword">try</span> &#123;</span><br><span class="line">					message = <span class="keyword">new</span> String(body, <span class="string">&quot;UTF-8&quot;</span>);</span><br><span class="line">					System.out.println(<span class="string">&quot; [x] Received &#x27;&quot;</span> + message + <span class="string">&quot;&#x27;&quot;</span>);</span><br><span class="line">					System.out.printf(<span class="string">&quot;in consumer B (delivery tag is %d): %s\n&quot;</span>, envelope.getDeliveryTag(), message);</span><br><span class="line">					<span class="comment">//自动签收</span></span><br><span class="line">					<span class="comment">/*</span></span><br><span class="line"><span class="comment">					* deliveryTag:该消息的index</span></span><br><span class="line"><span class="comment">					*multiple：是否批量.true:将一次性ack所有小于deliveryTag的消息。</span></span><br><span class="line"><span class="comment">					* */</span></span><br><span class="line">					channel.basicAck(envelope.getDeliveryTag(), <span class="keyword">false</span>);</span><br><span class="line">				&#125; <span class="keyword">catch</span> (UnsupportedEncodingException e) &#123;</span><br><span class="line">					e.printStackTrace();</span><br><span class="line">					<span class="comment">/**</span></span><br><span class="line"><span class="comment">					 * 自动拒签</span></span><br><span class="line"><span class="comment">					 * deliveryTag:该消息的index</span></span><br><span class="line"><span class="comment">					 * multiple：是否批量.true:将一次性拒绝所有小于deliveryTag的消息。</span></span><br><span class="line"><span class="comment">					 * requeue：被拒绝的是否重新入队列</span></span><br><span class="line"><span class="comment">					 *</span></span><br><span class="line"><span class="comment">					 * 其实还有channel.basicReject方法可以拒签</span></span><br><span class="line"><span class="comment">					 * channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息，而basicReject一次只能拒绝一条消息</span></span><br><span class="line"><span class="comment">					 */</span></span><br><span class="line">					channel.basicNack(envelope.getDeliveryTag(),<span class="keyword">false</span>,<span class="keyword">true</span>);</span><br><span class="line">				&#125;</span><br><span class="line"></span><br><span class="line">			&#125;</span><br><span class="line">		&#125;;</span><br><span class="line"></span><br><span class="line">		<span class="comment">/**</span></span><br><span class="line"><span class="comment">		 * 设置channel</span></span><br><span class="line"><span class="comment">		 * autoAck：是否自动ack，如果不自动ack，</span></span><br><span class="line"><span class="comment">		 * 需要使用channel.basicAck、channel.basicNack、channel.basicReject 进行消息应答</span></span><br><span class="line"><span class="comment">		 */</span></span><br><span class="line">		channel.basicConsume(queueName, <span class="keyword">false</span>, consumer);</span><br><span class="line"></span><br><span class="line">	&#125;</span><br><span class="line">&#125;</span><br><span class="line"></span><br></pre></td></tr></table></figure>
<p>看打印结果可以看到消息消费成功：</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200511004349.png" alt=""></p>
<p>再去看MQ的管理界面，查看queue，可以看到消息已经被消费完成。</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200511004616.png" alt=""></p>
<p>在消费者的代码中，因为没有关闭channel，所以消费者一直在启动着对队列的监听，需要去手动执行关闭程序才会停止，这个时候如果在消费者再发送一次消息，执行一下消费者，可以看到新生产的消息已经被消费成功。</p>
<p><img src="https://gitee.com/zuori/kaiscript/raw/master/images/20200511005124.png" alt=""></p>
<p>这样一个最简单的生产者与消费者的DEMO就完成了。</p>
<h2 id="SpringBoot整合RabbitMQ">SpringBoot整合RabbitMQ</h2>
<p>这次的实例我创建了两个SpringBoot工程，一个是rabbitmq-producer（生产者），另一个就是rabbitmq-consumer(消费者）</p>
<h3 id="rabbitmq-producer（生产者）">rabbitmq-producer（生产者）</h3>
<p><strong>POM文件</strong>：</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br></pre></td><td class="code"><pre><span class="line">&lt;dependency&gt;</span><br><span class="line">    &lt;groupId&gt;org.springframework.boot&lt;&#x2F;groupId&gt;</span><br><span class="line">    &lt;artifactId&gt;spring-boot-starter-web&lt;&#x2F;artifactId&gt;</span><br><span class="line">&lt;&#x2F;dependency&gt;</span><br><span class="line"></span><br><span class="line">&lt;!--rabbitmq--&gt;</span><br><span class="line">&lt;dependency&gt;</span><br><span class="line">    &lt;groupId&gt;org.springframework.boot&lt;&#x2F;groupId&gt;</span><br><span class="line">    &lt;artifactId&gt;spring-boot-starter-amqp&lt;&#x2F;artifactId&gt;</span><br><span class="line">&lt;&#x2F;dependency&gt;</span><br></pre></td></tr></table></figure>
<p><strong>application.yml</strong>:</p>
<figure class="highlight yml"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br></pre></td><td class="code"><pre><span class="line"><span class="attr">spring:</span></span><br><span class="line">  <span class="attr">datasource:</span></span><br><span class="line">      <span class="attr">url:</span> <span class="string">jdbc:mysql://192.168.56.2:3306/hodr_system?useUnicode=true&amp;characterEncoding=utf-8&amp;useSSL=false&amp;serverTimezone=UTC</span></span><br><span class="line">      <span class="attr">username:</span> <span class="string">root</span></span><br><span class="line">      <span class="attr">password:</span> <span class="number">123456</span></span><br><span class="line">  <span class="comment">#项目名称</span></span><br><span class="line">  <span class="attr">application:</span></span><br><span class="line">    <span class="attr">name:</span> <span class="string">rabbitmq-producer</span></span><br><span class="line">  <span class="comment">#配置rabbitMq 服务器</span></span><br><span class="line">  <span class="attr">rabbitmq:</span></span><br><span class="line"><span class="comment">#    host: 192.168.56.2</span></span><br><span class="line"><span class="comment">#    port: 5672</span></span><br><span class="line">    <span class="attr">addresses:</span> <span class="number">192.168</span><span class="number">.56</span><span class="number">.2</span><span class="string">:5672</span></span><br><span class="line">    <span class="attr">username:</span> <span class="string">guest</span></span><br><span class="line">    <span class="attr">password:</span> <span class="string">guest</span></span><br><span class="line">    <span class="comment">#虚拟host 可以不设置,默认是 /</span></span><br><span class="line">    <span class="attr">virtual-host:</span> <span class="string">/</span></span><br><span class="line">    <span class="attr">durable:</span> <span class="literal">true</span></span><br><span class="line">    <span class="attr">auto-delete:</span> <span class="literal">false</span></span><br><span class="line"></span><br><span class="line"></span><br><span class="line">  <span class="attr">jackson:</span></span><br><span class="line">    <span class="attr">date-format:</span> <span class="string">java.text.SimpleDateFormat</span></span><br><span class="line">    <span class="attr">time-zone:</span> <span class="string">GMT+8</span></span><br><span class="line">    <span class="attr">default-property-inclusion:</span> <span class="string">non_null</span></span><br><span class="line"></span><br><span class="line"><span class="meta">---</span></span><br><span class="line"></span><br><span class="line"><span class="attr">server:</span></span><br><span class="line">  <span class="attr">port:</span> <span class="number">8100</span></span><br><span class="line">  <span class="comment">#mybatis配置</span></span><br><span class="line"><span class="attr">mybatis:</span></span><br><span class="line">  <span class="attr">mapperLocations:</span> <span class="string">classpath*:/mapper/*.xml</span></span><br><span class="line">  <span class="attr">configuration:</span></span><br><span class="line">    <span class="attr">mapUnderscoreToCamelCase:</span> <span class="literal">true</span></span><br><span class="line">    <span class="attr">log-impl:</span> <span class="string">org.apache.ibatis.logging.stdout.StdOutImpl</span></span><br><span class="line">    </span><br></pre></td></tr></table></figure>
<p><strong>RabbitMqSendMessageService.java</strong></p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">package</span> com.zkk.rabbitmq.producer.rabbitmq;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> com.zkk.rabbitmq.producer.domain.Company;</span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * description  生产者发送消息接口</span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span> Zaker 2020/05/16 20:08</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">interface</span> <span class="title">RabbitMqSendMessageService</span> </span>&#123;</span><br><span class="line">	<span class="comment">/**</span></span><br><span class="line"><span class="comment">	 *direct模式发送</span></span><br><span class="line"><span class="comment">	 *</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@param</span> company</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@author</span> Zaker 2020-05-16 22:10</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@return</span></span></span><br><span class="line"><span class="comment">	 */</span></span><br><span class="line">	<span class="function"><span class="keyword">void</span> <span class="title">directSend</span><span class="params">(Company company)</span></span>;</span><br><span class="line"></span><br><span class="line">	<span class="comment">/**</span></span><br><span class="line"><span class="comment">	 *topic模式发送</span></span><br><span class="line"><span class="comment">	 *</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@param</span> company</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@author</span> Zaker 2020-05-16 22:10</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@return</span></span></span><br><span class="line"><span class="comment">	 */</span></span><br><span class="line">	<span class="function"><span class="keyword">void</span> <span class="title">topicSend</span><span class="params">(Company company)</span></span>;</span><br><span class="line"></span><br><span class="line">	<span class="comment">/**</span></span><br><span class="line"><span class="comment">	 *fanout模式发送</span></span><br><span class="line"><span class="comment">	 *</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@param</span> company</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@author</span> Zaker 2020-05-16 22:10</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@return</span></span></span><br><span class="line"><span class="comment">	 */</span></span><br><span class="line">	<span class="function"><span class="keyword">void</span> <span class="title">fanoutSend</span><span class="params">(Company company)</span></span>;</span><br><span class="line">&#125;</span><br><span class="line"></span><br></pre></td></tr></table></figure>
<p><strong>RabbitMqSendMessageServiceImpl.java</strong></p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br><span class="line">72</span><br><span class="line">73</span><br><span class="line">74</span><br><span class="line">75</span><br><span class="line">76</span><br><span class="line">77</span><br><span class="line">78</span><br><span class="line">79</span><br><span class="line">80</span><br><span class="line">81</span><br><span class="line">82</span><br><span class="line">83</span><br><span class="line">84</span><br><span class="line">85</span><br><span class="line">86</span><br><span class="line">87</span><br><span class="line">88</span><br><span class="line">89</span><br><span class="line">90</span><br><span class="line">91</span><br><span class="line">92</span><br><span class="line">93</span><br><span class="line">94</span><br><span class="line">95</span><br><span class="line">96</span><br><span class="line">97</span><br><span class="line">98</span><br><span class="line">99</span><br><span class="line">100</span><br><span class="line">101</span><br><span class="line">102</span><br><span class="line">103</span><br><span class="line">104</span><br><span class="line">105</span><br><span class="line">106</span><br><span class="line">107</span><br><span class="line">108</span><br><span class="line">109</span><br><span class="line">110</span><br><span class="line">111</span><br><span class="line">112</span><br><span class="line">113</span><br><span class="line">114</span><br><span class="line">115</span><br><span class="line">116</span><br><span class="line">117</span><br><span class="line">118</span><br><span class="line">119</span><br><span class="line">120</span><br><span class="line">121</span><br><span class="line">122</span><br><span class="line">123</span><br><span class="line">124</span><br><span class="line">125</span><br><span class="line">126</span><br><span class="line">127</span><br><span class="line">128</span><br><span class="line">129</span><br><span class="line">130</span><br><span class="line">131</span><br><span class="line">132</span><br><span class="line">133</span><br><span class="line">134</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">package</span> com.zkk.rabbitmq.producer.rabbitmq.impl;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.Channel;</span><br><span class="line"><span class="keyword">import</span> com.zkk.rabbitmq.producer.domain.Company;</span><br><span class="line"><span class="keyword">import</span> com.zkk.rabbitmq.producer.rabbitmq.RabbitMqSendMessageService;</span><br><span class="line"><span class="keyword">import</span> net.sf.json.JSONObject;</span><br><span class="line"><span class="keyword">import</span> org.springframework.amqp.rabbit.connection.Connection;</span><br><span class="line"><span class="keyword">import</span> org.springframework.amqp.rabbit.connection.ConnectionFactory;</span><br><span class="line"><span class="keyword">import</span> org.springframework.amqp.rabbit.core.RabbitTemplate;</span><br><span class="line"><span class="keyword">import</span> org.springframework.beans.factory.annotation.Autowired;</span><br><span class="line"><span class="keyword">import</span> org.springframework.stereotype.Component;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> java.io.IOException;</span><br><span class="line"><span class="keyword">import</span> java.time.LocalDateTime;</span><br><span class="line"><span class="keyword">import</span> java.time.format.DateTimeFormatter;</span><br><span class="line"><span class="keyword">import</span> java.util.HashMap;</span><br><span class="line"><span class="keyword">import</span> java.util.Map;</span><br><span class="line"><span class="keyword">import</span> java.util.UUID;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * description</span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span> Zaker 2020/05/16 20:09</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="meta">@Component</span></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">RabbitMqSendMessageServiceImpl</span> <span class="keyword">implements</span> <span class="title">RabbitMqSendMessageService</span> </span>&#123;</span><br><span class="line">	<span class="meta">@Autowired</span></span><br><span class="line">	<span class="keyword">private</span> RabbitTemplate rabbitTemplate;</span><br><span class="line"></span><br><span class="line">	<span class="comment">/**</span></span><br><span class="line"><span class="comment">	 *direct模式发送消息</span></span><br><span class="line"><span class="comment">	 *</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@param</span> company</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@author</span> Zaker 2020-05-16 22:10</span></span><br><span class="line"><span class="comment">	 * <span class="doctag">@return</span></span></span><br><span class="line"><span class="comment">	 */</span></span><br><span class="line">	<span class="meta">@Override</span></span><br><span class="line">	<span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">directSend</span><span class="params">(Company company)</span> </span>&#123;</span><br><span class="line">		String EXCHANGE_NAME=<span class="string">&quot;company-directExchange&quot;</span>;</span><br><span class="line"></span><br><span class="line">		ConnectionFactory connectionFactory= rabbitTemplate.getConnectionFactory();</span><br><span class="line">		Connection connection=connectionFactory.createConnection();</span><br><span class="line">		Channel channel=connection.createChannel(<span class="keyword">true</span>);</span><br><span class="line"></span><br><span class="line">		<span class="keyword">try</span> &#123;</span><br><span class="line">			channel.exchangeDeclare(EXCHANGE_NAME, <span class="string">&quot;direct&quot;</span>,<span class="keyword">true</span>,<span class="keyword">false</span>,<span class="keyword">null</span>);</span><br><span class="line">		&#125; <span class="keyword">catch</span> (IOException e) &#123;</span><br><span class="line">			e.printStackTrace();</span><br><span class="line">		&#125;</span><br><span class="line">		<span class="comment">//创建message的map，将要发送的消息封装进map中</span></span><br><span class="line">		Map&lt;String,Object&gt; map=<span class="keyword">new</span> HashMap&lt;&gt;();</span><br><span class="line">		String messageId = String.valueOf(UUID.randomUUID());</span><br><span class="line">		String messageData = objectToJson(company);</span><br><span class="line">		String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern(<span class="string">&quot;yyyy-MM-dd HH:mm:ss&quot;</span>));</span><br><span class="line">		map.put(<span class="string">&quot;messageId&quot;</span>,messageId);</span><br><span class="line">		map.put(<span class="string">&quot;messageData&quot;</span>,messageData);</span><br><span class="line">		map.put(<span class="string">&quot;createTime&quot;</span>,createTime);</span><br><span class="line">		rabbitTemplate.convertAndSend(EXCHANGE_NAME, <span class="string">&quot;company.direct&quot;</span>, map);</span><br><span class="line">		System.out.print(<span class="string">&quot;公司数据推送成功direct：&quot;</span>+company.getCompanyName());</span><br><span class="line">	&#125;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line"></span><br><span class="line">	<span class="meta">@Override</span></span><br><span class="line">	<span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">topicSend</span><span class="params">(Company company)</span> </span>&#123;</span><br><span class="line">		String EXCHANGE_NAME=<span class="string">&quot;company-topicExchange&quot;</span>;</span><br><span class="line">		ConnectionFactory connectionFactory= rabbitTemplate.getConnectionFactory();</span><br><span class="line">		Connection connection=connectionFactory.createConnection();</span><br><span class="line">		Channel channel=connection.createChannel(<span class="keyword">true</span>);</span><br><span class="line">		<span class="keyword">try</span> &#123;</span><br><span class="line">			channel.exchangeDeclare(EXCHANGE_NAME, <span class="string">&quot;topic&quot;</span>,<span class="keyword">true</span>,<span class="keyword">false</span>,<span class="keyword">null</span>);</span><br><span class="line">		&#125; <span class="keyword">catch</span> (IOException e) &#123;</span><br><span class="line">			e.printStackTrace();</span><br><span class="line">		&#125;</span><br><span class="line">		<span class="comment">/*</span></span><br><span class="line"><span class="comment">		 *</span></span><br><span class="line"><span class="comment">		 * exchange     交换机名称</span></span><br><span class="line"><span class="comment">		 * routingKey    路由</span></span><br><span class="line"><span class="comment">		 * object         消息体内容</span></span><br><span class="line"><span class="comment">		 * correlationData    可以自己指定的消息的唯一id</span></span><br><span class="line"><span class="comment">		 *</span></span><br><span class="line"><span class="comment">		 * */</span></span><br><span class="line">		Map&lt;String,Object&gt; map=<span class="keyword">new</span> HashMap&lt;&gt;();</span><br><span class="line">		String messageId = String.valueOf(UUID.randomUUID());</span><br><span class="line">		String messageData = objectToJson(company);</span><br><span class="line">		String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern(<span class="string">&quot;yyyy-MM-dd HH:mm:ss&quot;</span>));</span><br><span class="line">		map.put(<span class="string">&quot;messageId&quot;</span>,messageId);</span><br><span class="line">		map.put(<span class="string">&quot;messageData&quot;</span>,messageData);</span><br><span class="line">		map.put(<span class="string">&quot;createTime&quot;</span>,createTime);</span><br><span class="line"></span><br><span class="line">		rabbitTemplate.convertAndSend(EXCHANGE_NAME, <span class="string">&quot;company.topic&quot;</span>, map);</span><br><span class="line"></span><br><span class="line">		System.out.print(<span class="string">&quot;公司数据推送成功topic：&quot;</span>+company.getCompanyName());</span><br><span class="line">	&#125;</span><br><span class="line"></span><br><span class="line">	<span class="meta">@Override</span></span><br><span class="line">	<span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">fanoutSend</span><span class="params">(Company company)</span> </span>&#123;</span><br><span class="line">		String EXCHANGE_NAME=<span class="string">&quot;company-fanoutExchange&quot;</span>;</span><br><span class="line"></span><br><span class="line">		ConnectionFactory connectionFactory= rabbitTemplate.getConnectionFactory();</span><br><span class="line">		Connection connection=connectionFactory.createConnection();</span><br><span class="line">		Channel channel=connection.createChannel(<span class="keyword">true</span>);</span><br><span class="line">		<span class="keyword">try</span> &#123;</span><br><span class="line">			channel.exchangeDeclare(EXCHANGE_NAME, <span class="string">&quot;fanout&quot;</span>,<span class="keyword">true</span>,<span class="keyword">false</span>,<span class="keyword">null</span>);</span><br><span class="line">		&#125; <span class="keyword">catch</span> (IOException e) &#123;</span><br><span class="line">			e.printStackTrace();</span><br><span class="line">		&#125;</span><br><span class="line"></span><br><span class="line">		String messageId = String.valueOf(UUID.randomUUID());</span><br><span class="line">		String messageData = objectToJson(company);</span><br><span class="line">		String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern(<span class="string">&quot;yyyy-MM-dd HH:mm:ss&quot;</span>));</span><br><span class="line">		Map&lt;String,Object&gt; map=<span class="keyword">new</span> HashMap&lt;&gt;();</span><br><span class="line">		map.put(<span class="string">&quot;messageId&quot;</span>,messageId);</span><br><span class="line">		map.put(<span class="string">&quot;messageData&quot;</span>,messageData);</span><br><span class="line">		map.put(<span class="string">&quot;createTime&quot;</span>,createTime);</span><br><span class="line"></span><br><span class="line">		rabbitTemplate.convertAndSend(EXCHANGE_NAME, <span class="keyword">null</span>, map);</span><br><span class="line"></span><br><span class="line">		System.out.print(<span class="string">&quot;公司数据推送成功fanout：&quot;</span>+company.getCompanyName());</span><br><span class="line">	&#125;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">	<span class="function"><span class="keyword">public</span>  String <span class="title">objectToJson</span><span class="params">(Object obj)</span>  </span>&#123;</span><br><span class="line">		JSONObject jsonObject=JSONObject.fromObject(obj);</span><br><span class="line">		String jsonStr  =jsonObject.toString();</span><br><span class="line">		<span class="keyword">return</span> jsonStr;</span><br><span class="line">	&#125;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">&#125;</span><br><span class="line"></span><br></pre></td></tr></table></figure>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br><span class="line">72</span><br><span class="line">73</span><br><span class="line">74</span><br><span class="line">75</span><br><span class="line">76</span><br><span class="line">77</span><br><span class="line">78</span><br><span class="line">79</span><br><span class="line">80</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">package</span> com.zkk.rabbitmq.producer.controller;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> com.sun.net.httpserver.Authenticator;</span><br><span class="line"><span class="keyword">import</span> com.zkk.rabbitmq.producer.domain.Company;</span><br><span class="line"><span class="keyword">import</span> com.zkk.rabbitmq.producer.rabbitmq.RabbitMqSendMessageService;</span><br><span class="line"><span class="keyword">import</span> com.zkk.rabbitmq.producer.service.CompanyService;</span><br><span class="line"><span class="keyword">import</span> org.springframework.beans.factory.annotation.Autowired;</span><br><span class="line"><span class="keyword">import</span> org.springframework.http.ResponseEntity;</span><br><span class="line"><span class="keyword">import</span> org.springframework.web.bind.annotation.PostMapping;</span><br><span class="line"><span class="keyword">import</span> org.springframework.web.bind.annotation.RequestMapping;</span><br><span class="line"><span class="keyword">import</span> org.springframework.web.bind.annotation.RequestParam;</span><br><span class="line"><span class="keyword">import</span> org.springframework.web.bind.annotation.RestController;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> java.util.List;</span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * description  生产者发送消息接口类</span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span> Zaker 2020/05/16 21:08</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="meta">@RestController</span></span><br><span class="line"><span class="meta">@RequestMapping(&quot;/v1/rabbitmq&quot;)</span></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">RabbitMqController</span> </span>&#123;</span><br><span class="line"></span><br><span class="line">   <span class="meta">@Autowired</span></span><br><span class="line">   <span class="keyword">private</span> RabbitMqSendMessageService rabbitMqSendMessageService;</span><br><span class="line">   <span class="meta">@Autowired</span></span><br><span class="line">    <span class="keyword">private</span> CompanyService companyService;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">   <span class="meta">@PostMapping(&quot;/direct&quot;)</span></span><br><span class="line">   <span class="function"><span class="keyword">public</span> ResponseEntity <span class="title">directSend</span><span class="params">(<span class="meta">@RequestParam(required = false)</span> Long companyId,</span></span></span><br><span class="line"><span class="function"><span class="params">                            <span class="meta">@RequestParam(required = false)</span> String companyNumber,</span></span></span><br><span class="line"><span class="function"><span class="params">                            <span class="meta">@RequestParam(required = false)</span> String companyName)</span></span>&#123;</span><br><span class="line"></span><br><span class="line">      Company company=<span class="keyword">new</span> Company();</span><br><span class="line">      company.setCompanyId(companyId);</span><br><span class="line">      company.setCompanyName(companyNumber);</span><br><span class="line">      company.setCompanyNumber(companyName);</span><br><span class="line">      List&lt;Company&gt; companyList=companyService.queryCompanyBydto(company);</span><br><span class="line">      <span class="keyword">for</span>(Company c:companyList)&#123;</span><br><span class="line">         rabbitMqSendMessageService.directSend(c);</span><br><span class="line">      &#125;</span><br><span class="line">      <span class="keyword">return</span> ResponseEntity.ok(<span class="string">&quot;ok&quot;</span>);</span><br><span class="line">   &#125;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">   <span class="meta">@PostMapping(&quot;/topic&quot;)</span></span><br><span class="line">   <span class="function"><span class="keyword">public</span> ResponseEntity <span class="title">topicSend</span><span class="params">(<span class="meta">@RequestParam(required = false)</span> Long companyId,</span></span></span><br><span class="line"><span class="function"><span class="params">                            <span class="meta">@RequestParam(required = false)</span> String companyNumber,</span></span></span><br><span class="line"><span class="function"><span class="params">                            <span class="meta">@RequestParam(required = false)</span> String companyName)</span></span>&#123;</span><br><span class="line"></span><br><span class="line">      Company company=<span class="keyword">new</span> Company();</span><br><span class="line">      company.setCompanyId(companyId);</span><br><span class="line">      company.setCompanyName(companyNumber);</span><br><span class="line">      company.setCompanyNumber(companyName);</span><br><span class="line">      List&lt;Company&gt; companyList=companyService.queryCompanyBydto(company);</span><br><span class="line">      <span class="keyword">for</span>(Company c:companyList)&#123;</span><br><span class="line">         rabbitMqSendMessageService.topicSend(c);</span><br><span class="line">      &#125;</span><br><span class="line">      <span class="keyword">return</span> ResponseEntity.ok(<span class="string">&quot;ok&quot;</span>);</span><br><span class="line">   &#125;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line">   <span class="meta">@PostMapping(&quot;/fanout&quot;)</span></span><br><span class="line">   <span class="function"><span class="keyword">public</span> ResponseEntity <span class="title">fanoutSend</span><span class="params">(<span class="meta">@RequestParam(required = false)</span> Long companyId,</span></span></span><br><span class="line"><span class="function"><span class="params">                           <span class="meta">@RequestParam(required = false)</span> String companyNumber,</span></span></span><br><span class="line"><span class="function"><span class="params">                           <span class="meta">@RequestParam(required = false)</span> String companyName)</span></span>&#123;</span><br><span class="line"></span><br><span class="line">      Company company=<span class="keyword">new</span> Company();</span><br><span class="line">      company.setCompanyId(companyId);</span><br><span class="line">      company.setCompanyName(companyNumber);</span><br><span class="line">      company.setCompanyNumber(companyName);</span><br><span class="line">      List&lt;Company&gt; companyList=companyService.queryCompanyBydto(company);</span><br><span class="line">      <span class="keyword">for</span>(Company c:companyList)&#123;</span><br><span class="line">         rabbitMqSendMessageService.fanoutSend(c);</span><br><span class="line">      &#125;</span><br><span class="line">      <span class="keyword">return</span> ResponseEntity.ok(<span class="string">&quot;ok&quot;</span>);</span><br><span class="line">   &#125;</span><br><span class="line">&#125;</span><br></pre></td></tr></table></figure>
<h3 id="rabbitmq-consumer-消费者）">rabbitmq-consumer(消费者）</h3>
<p><strong>POM文件</strong>：</p>
<figure class="highlight plain"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br></pre></td><td class="code"><pre><span class="line">&lt;dependency&gt;</span><br><span class="line">    &lt;groupId&gt;org.springframework.boot&lt;&#x2F;groupId&gt;</span><br><span class="line">    &lt;artifactId&gt;spring-boot-starter-web&lt;&#x2F;artifactId&gt;</span><br><span class="line">&lt;&#x2F;dependency&gt;</span><br><span class="line"></span><br><span class="line">&lt;!--rabbitmq--&gt;</span><br><span class="line">&lt;dependency&gt;</span><br><span class="line">    &lt;groupId&gt;org.springframework.boot&lt;&#x2F;groupId&gt;</span><br><span class="line">    &lt;artifactId&gt;spring-boot-starter-amqp&lt;&#x2F;artifactId&gt;</span><br><span class="line">&lt;&#x2F;dependency&gt;</span><br></pre></td></tr></table></figure>
<p><strong>application.yml</strong>:</p>
<figure class="highlight yaml"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br></pre></td><td class="code"><pre><span class="line"><span class="attr">spring:</span></span><br><span class="line">  <span class="attr">datasource:</span></span><br><span class="line">      <span class="attr">url:</span> <span class="string">jdbc:mysql://192.168.56.2:3306/hodr_system?useUnicode=true&amp;characterEncoding=utf-8&amp;useSSL=false&amp;serverTimezone=UTC</span></span><br><span class="line">      <span class="attr">username:</span> <span class="string">root</span></span><br><span class="line">      <span class="attr">password:</span> <span class="number">123456</span></span><br><span class="line">  <span class="comment">#项目名称</span></span><br><span class="line">  <span class="attr">application:</span></span><br><span class="line">    <span class="attr">name:</span> <span class="string">rabbitmq-producer</span></span><br><span class="line"></span><br><span class="line"></span><br><span class="line">  <span class="comment">#配置rabbitMq 服务器</span></span><br><span class="line">  <span class="attr">rabbitmq:</span></span><br><span class="line">  <span class="comment">#    host: 192.168.56.2</span></span><br><span class="line">  <span class="comment">#    port: 5672</span></span><br><span class="line">    <span class="attr">addresses:</span> <span class="number">192.168</span><span class="number">.56</span><span class="number">.2</span><span class="string">:5672</span></span><br><span class="line">    <span class="attr">username:</span> <span class="string">guest</span></span><br><span class="line">    <span class="attr">password:</span> <span class="string">guest</span></span><br><span class="line">    <span class="comment">#虚拟host 可以不设置,默认是 /</span></span><br><span class="line">    <span class="attr">virtual-host:</span> <span class="string">/</span></span><br><span class="line">    <span class="attr">durable:</span> <span class="literal">true</span></span><br><span class="line">    <span class="attr">auto-delete:</span> <span class="literal">false</span></span><br><span class="line">    <span class="comment">#配置监听属性</span></span><br><span class="line">    <span class="attr">listener:</span></span><br><span class="line">      <span class="attr">simple:</span></span><br><span class="line">        <span class="attr">acknowledge-mode:</span> <span class="string">manual</span></span><br><span class="line">        <span class="attr">concurrency:</span> <span class="number">10</span></span><br><span class="line">        <span class="attr">max-concurrency:</span> <span class="number">20</span></span><br><span class="line"><span class="comment">#    istener:</span></span><br><span class="line"><span class="comment">#      simple:</span></span><br><span class="line"><span class="comment">#        #自动签收auto 手动 manual</span></span><br><span class="line"><span class="comment">#        acknowledge-mode: manual</span></span><br><span class="line"><span class="comment">#        #最大并发数</span></span><br><span class="line"><span class="comment">#        concurrency: 10</span></span><br><span class="line"></span><br><span class="line"></span><br><span class="line"><span class="attr">jackson:</span></span><br><span class="line">  <span class="attr">date-format:</span> <span class="string">java.text.SimpleDateFormat</span></span><br><span class="line">  <span class="attr">time-zone:</span> <span class="string">GMT+8</span></span><br><span class="line">  <span class="attr">default-property-inclusion:</span> <span class="string">non_null</span></span><br><span class="line"></span><br><span class="line"><span class="meta">---</span></span><br><span class="line"></span><br><span class="line"><span class="attr">server:</span></span><br><span class="line">  <span class="attr">port:</span> <span class="number">8200</span></span><br><span class="line">  <span class="comment">#mybatis配置</span></span><br><span class="line"></span><br><span class="line"><span class="attr">mybatis:</span></span><br><span class="line">  <span class="attr">mapperLocations:</span> <span class="string">classpath*:/mapper/*.xml</span></span><br><span class="line">  <span class="attr">configuration:</span></span><br><span class="line">    <span class="attr">mapUnderscoreToCamelCase:</span> <span class="literal">true</span></span><br><span class="line">    <span class="attr">log-impl:</span> <span class="string">org.apache.ibatis.logging.stdout.StdOutImpl</span></span><br><span class="line"></span><br></pre></td></tr></table></figure>
<p><strong>RabbitMqListener.java</strong>(主要的消息监听类)</p>
<figure class="highlight java"><table><tr><td class="gutter"><pre><span class="line">1</span><br><span class="line">2</span><br><span class="line">3</span><br><span class="line">4</span><br><span class="line">5</span><br><span class="line">6</span><br><span class="line">7</span><br><span class="line">8</span><br><span class="line">9</span><br><span class="line">10</span><br><span class="line">11</span><br><span class="line">12</span><br><span class="line">13</span><br><span class="line">14</span><br><span class="line">15</span><br><span class="line">16</span><br><span class="line">17</span><br><span class="line">18</span><br><span class="line">19</span><br><span class="line">20</span><br><span class="line">21</span><br><span class="line">22</span><br><span class="line">23</span><br><span class="line">24</span><br><span class="line">25</span><br><span class="line">26</span><br><span class="line">27</span><br><span class="line">28</span><br><span class="line">29</span><br><span class="line">30</span><br><span class="line">31</span><br><span class="line">32</span><br><span class="line">33</span><br><span class="line">34</span><br><span class="line">35</span><br><span class="line">36</span><br><span class="line">37</span><br><span class="line">38</span><br><span class="line">39</span><br><span class="line">40</span><br><span class="line">41</span><br><span class="line">42</span><br><span class="line">43</span><br><span class="line">44</span><br><span class="line">45</span><br><span class="line">46</span><br><span class="line">47</span><br><span class="line">48</span><br><span class="line">49</span><br><span class="line">50</span><br><span class="line">51</span><br><span class="line">52</span><br><span class="line">53</span><br><span class="line">54</span><br><span class="line">55</span><br><span class="line">56</span><br><span class="line">57</span><br><span class="line">58</span><br><span class="line">59</span><br><span class="line">60</span><br><span class="line">61</span><br><span class="line">62</span><br><span class="line">63</span><br><span class="line">64</span><br><span class="line">65</span><br><span class="line">66</span><br><span class="line">67</span><br><span class="line">68</span><br><span class="line">69</span><br><span class="line">70</span><br><span class="line">71</span><br><span class="line">72</span><br><span class="line">73</span><br><span class="line">74</span><br><span class="line">75</span><br><span class="line">76</span><br><span class="line">77</span><br><span class="line">78</span><br><span class="line">79</span><br><span class="line">80</span><br><span class="line">81</span><br><span class="line">82</span><br><span class="line">83</span><br><span class="line">84</span><br><span class="line">85</span><br><span class="line">86</span><br><span class="line">87</span><br><span class="line">88</span><br><span class="line">89</span><br><span class="line">90</span><br><span class="line">91</span><br><span class="line">92</span><br><span class="line">93</span><br><span class="line">94</span><br><span class="line">95</span><br><span class="line">96</span><br><span class="line">97</span><br><span class="line">98</span><br><span class="line">99</span><br><span class="line">100</span><br><span class="line">101</span><br><span class="line">102</span><br><span class="line">103</span><br><span class="line">104</span><br><span class="line">105</span><br><span class="line">106</span><br></pre></td><td class="code"><pre><span class="line"><span class="keyword">package</span> com.zkk.rabbitmq.consumer.rabbitmq;</span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.AMQP;</span><br><span class="line"><span class="keyword">import</span> com.rabbitmq.client.Channel;</span><br><span class="line"><span class="keyword">import</span> com.zkk.rabbitmq.consumer.domain.Company;</span><br><span class="line"><span class="keyword">import</span> com.zkk.rabbitmq.consumer.service.RabbitMqListenerProcessService;</span><br><span class="line"><span class="keyword">import</span> org.springframework.amqp.rabbit.annotation.*;</span><br><span class="line"><span class="keyword">import</span> org.springframework.amqp.support.AmqpHeaders;</span><br><span class="line"><span class="keyword">import</span> org.springframework.beans.factory.annotation.Autowired;</span><br><span class="line"><span class="keyword">import</span> org.springframework.messaging.handler.annotation.Headers;</span><br><span class="line"><span class="keyword">import</span> org.springframework.messaging.handler.annotation.Payload;</span><br><span class="line"><span class="keyword">import</span> org.springframework.stereotype.Component;</span><br><span class="line"></span><br><span class="line"></span><br><span class="line"><span class="keyword">import</span> java.util.Map;</span><br><span class="line"></span><br><span class="line"><span class="comment">/**</span></span><br><span class="line"><span class="comment"> * description</span></span><br><span class="line"><span class="comment"> *</span></span><br><span class="line"><span class="comment"> * <span class="doctag">@author</span> Zaker 2020/05/17 1:38</span></span><br><span class="line"><span class="comment"> */</span></span><br><span class="line"><span class="meta">@Component</span></span><br><span class="line"><span class="keyword">public</span> <span class="class"><span class="keyword">class</span> <span class="title">RabbitMqListener</span> </span>&#123;</span><br><span class="line">	<span class="meta">@Autowired</span></span><br><span class="line">	<span class="keyword">private</span> RabbitMqListenerProcessService rabbitMqListenerProcessService;</span><br><span class="line">	<span class="comment">//@RabbitListener(queues = &quot;QU_SPC_RECE_PERNR&quot;,autoStartup=&quot;false&quot;,id = &quot;0001&quot;)</span></span><br><span class="line">	<span class="meta">@RabbitListener(bindings =@QueueBinding(</span></span><br><span class="line"><span class="meta">			value = @Queue(value = &quot;company-directQueue&quot;, durable = &quot;true&quot;, autoDelete = &quot;false&quot;),</span></span><br><span class="line"><span class="meta">			exchange = @Exchange(name = &quot;company-directExchange&quot;,durable = &quot;true&quot;,</span></span><br><span class="line"><span class="meta">											type = &quot;direct&quot;,autoDelete = &quot;false&quot;),</span></span><br><span class="line"><span class="meta">			key = &quot;company.direct&quot;</span></span><br><span class="line"><span class="meta">		)</span></span><br><span class="line"><span class="meta">	)</span></span><br><span class="line">	<span class="meta">@RabbitHandler</span></span><br><span class="line">	<span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">directRecive</span><span class="params">(<span class="meta">@Payload</span> Map messageMap,</span></span></span><br><span class="line"><span class="function"><span class="params">						<span class="meta">@Headers</span> Map&lt;String ,Object&gt; headers,</span></span></span><br><span class="line"><span class="function"><span class="params">						Channel channel</span></span></span><br><span class="line"><span class="function"><span class="params">	)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">		System.out.println(<span class="string">&quot;----direct收到消息，开始消费-----&quot;</span>);</span><br><span class="line">		Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);</span><br><span class="line">		<span class="keyword">try</span> &#123;</span><br><span class="line">			System.out.println(<span class="string">&quot;公司编号：&quot;</span>+messageMap.toString() );</span><br><span class="line">			<span class="comment">//手动签收</span></span><br><span class="line">			channel.basicAck(deliveryTag, <span class="keyword">false</span>);</span><br><span class="line">		&#125; <span class="keyword">catch</span> (Exception e) &#123;</span><br><span class="line">			<span class="comment">//手动拒签</span></span><br><span class="line">			channel.basicNack(deliveryTag, <span class="keyword">false</span>, <span class="keyword">true</span>);</span><br><span class="line">			System.out.println(<span class="string">&quot;消息拒签，停止队列监听&quot;</span>);</span><br><span class="line">			rabbitMqListenerProcessService.rabbitMqListenerStop(<span class="string">&quot;&quot;</span>);</span><br><span class="line">		&#125;</span><br><span class="line">	&#125;</span><br><span class="line"></span><br><span class="line">	<span class="meta">@RabbitListener(bindings =@QueueBinding(</span></span><br><span class="line"><span class="meta">			value = @Queue(value = &quot;company-topicQueue&quot;, durable = &quot;true&quot;, autoDelete = &quot;false&quot;),</span></span><br><span class="line"><span class="meta">			exchange = @Exchange(name = &quot;company-topicExchange&quot;,durable = &quot;true&quot;,</span></span><br><span class="line"><span class="meta">					type = &quot;topic&quot;,autoDelete = &quot;false&quot;),</span></span><br><span class="line"><span class="meta">			key = &quot;company.#&quot;</span></span><br><span class="line"><span class="meta">	)</span></span><br><span class="line"><span class="meta">	)</span></span><br><span class="line">	<span class="meta">@RabbitHandler</span></span><br><span class="line">	<span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">topicRecive</span><span class="params">(<span class="meta">@Payload</span> Map messageMap,</span></span></span><br><span class="line"><span class="function"><span class="params">						<span class="meta">@Headers</span> Map&lt;String ,Object&gt; headers,</span></span></span><br><span class="line"><span class="function"><span class="params">						Channel channel</span></span></span><br><span class="line"><span class="function"><span class="params">	)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">		System.out.println(<span class="string">&quot;----topic收到消息，开始消费-----&quot;</span>);</span><br><span class="line">		Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);</span><br><span class="line">		<span class="keyword">try</span> &#123;</span><br><span class="line">			<span class="comment">//System.out.println(&quot;公司编号：&quot;+company.getCompanyNumber() );</span></span><br><span class="line">			System.out.println(<span class="string">&quot;接收到的消息：&quot;</span>+messageMap.toString() );</span><br><span class="line">			<span class="comment">//手动签收</span></span><br><span class="line">			channel.basicAck(deliveryTag, <span class="keyword">false</span>);</span><br><span class="line">		&#125; <span class="keyword">catch</span> (Exception e) &#123;</span><br><span class="line">			<span class="comment">//手动拒签</span></span><br><span class="line">			channel.basicNack(deliveryTag, <span class="keyword">false</span>, <span class="keyword">true</span>);</span><br><span class="line">			System.out.println(<span class="string">&quot;消息拒签，停止队列监听&quot;</span>);</span><br><span class="line">			rabbitMqListenerProcessService.rabbitMqListenerStop(<span class="string">&quot;&quot;</span>);</span><br><span class="line">		&#125;</span><br><span class="line">	&#125;</span><br><span class="line"></span><br><span class="line">	<span class="meta">@RabbitListener(bindings =@QueueBinding(</span></span><br><span class="line"><span class="meta">			value = @Queue(value = &quot;company-fanoutQueue&quot;, durable = &quot;true&quot;, autoDelete = &quot;false&quot;),</span></span><br><span class="line"><span class="meta">			exchange = @Exchange(name = &quot;company-fanoutExchange&quot;,durable = &quot;true&quot;,</span></span><br><span class="line"><span class="meta">					type = &quot;fanout&quot;,autoDelete = &quot;false&quot;)</span></span><br><span class="line"><span class="meta">	)</span></span><br><span class="line"><span class="meta">	)</span></span><br><span class="line">	<span class="meta">@RabbitHandler</span></span><br><span class="line">	<span class="function"><span class="keyword">public</span> <span class="keyword">void</span> <span class="title">fanoutRecive</span><span class="params">(<span class="meta">@Payload</span> Map messageMap,</span></span></span><br><span class="line"><span class="function"><span class="params">							<span class="meta">@Headers</span> Map&lt;String ,Object&gt; headers,</span></span></span><br><span class="line"><span class="function"><span class="params">							Channel channel</span></span></span><br><span class="line"><span class="function"><span class="params">	)</span> <span class="keyword">throws</span> Exception </span>&#123;</span><br><span class="line">		System.out.println(<span class="string">&quot;----fanout收到消息，开始消费-----&quot;</span>);</span><br><span class="line">		Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);</span><br><span class="line">		<span class="keyword">try</span> &#123;</span><br><span class="line">			System.out.println(<span class="string">&quot;接收到的消息：&quot;</span>+messageMap.get(<span class="string">&quot;messageData&quot;</span>).toString() );</span><br><span class="line">			<span class="comment">//手动签收</span></span><br><span class="line">			channel.basicAck(deliveryTag, <span class="keyword">false</span>);</span><br><span class="line">		&#125; <span class="keyword">catch</span> (Exception e) &#123;</span><br><span class="line">			<span class="comment">//手动拒签</span></span><br><span class="line">			channel.basicNack(deliveryTag, <span class="keyword">false</span>, <span class="keyword">true</span>);</span><br><span class="line">			System.out.println(<span class="string">&quot;消息拒签，停止队列监听&quot;</span>);</span><br><span class="line">			rabbitMqListenerProcessService.rabbitMqListenerStop(<span class="string">&quot;&quot;</span>);</span><br><span class="line">		&#125;</span><br><span class="line">	&#125;</span><br><span class="line"></span><br><span class="line">&#125;</span><br><span class="line"></span><br></pre></td></tr></table></figure>
</article><div class="post-copyright"><div class="post-copyright__author"><span class="post-copyright-meta">文章作者: </span><span class="post-copyright-info"><a href="mailto:undefined">张十三</a></span></div><div class="post-copyright__type"><span class="post-copyright-meta">文章链接: </span><span class="post-copyright-info"><a href="http://zuori.gitee.io/zaker/2021/03/26/RabbitMq%E5%9F%BA%E7%A1%80%E4%BD%BF%E7%94%A8/">http://zuori.gitee.io/zaker/2021/03/26/RabbitMq%E5%9F%BA%E7%A1%80%E4%BD%BF%E7%94%A8/</a></span></div><div class="post-copyright__notice"><span class="post-copyright-meta">版权声明: </span><span class="post-copyright-info">本博客所有文章除特别声明外，均采用 <a href="https://creativecommons.org/licenses/by-nc-sa/4.0/" target="_blank">CC BY-NC-SA 4.0</a> 许可协议。转载请注明来自 <a href="http://zuori.gitee.io/zaker" target="_blank">十三不知</a>！</span></div></div><div class="tag_share"><div class="post-meta__tag-list"><a class="post-meta__tags" href="/zaker/tags/MQ/">MQ</a></div><div class="post_share"><div class="addthis_inline_share_toolbox"></div><script src="//s7.addthis.com/js/300/addthis_widget.js#pubid=null" async="async"></script></div></div><nav class="pagination-post" id="pagination"><div class="prev-post pull-left"><a href="/zaker/2021/03/30/Kubernetes%E9%9B%86%E7%BE%A4%E7%9B%91%E6%8E%A7%E4%B9%8B%20Metrics%20Server/"><img class="prev-cover" src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg" onerror="onerror=null;src='/zaker/img/404.jpg'" alt="cover of previous post"><div class="pagination-info"><div class="label">上一篇</div><div class="prev_info">Kubernetes集群监控之 Metrics Server</div></div></a></div><div class="next-post pull-right"><a href="/zaker/2021/03/25/K8s%E5%8F%AF%E8%A7%86%E5%8C%96%E7%9B%91%E6%8E%A7%E4%B9%8B-Weave%20Scope/"><img class="next-cover" src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg" onerror="onerror=null;src='/zaker/img/404.jpg'" alt="cover of next post"><div class="pagination-info"><div class="label">下一篇</div><div class="next_info">Kubernetes集群可视化监控之-Weave Scope</div></div></a></div></nav></div><div class="aside-content" id="aside-content"><div class="card-widget card-info"><div class="card-info-avatar is-center"><img class="avatar-img" src="https://gitee.com/zuori/kaiscript/raw/master/images/20210325192506.jpg" onerror="this.onerror=null;this.src='/zaker/img/friend_404.gif'" alt="avatar"/><div class="author-info__name">张十三</div><div class="author-info__description"></div></div><div class="card-info-data"><div class="card-info-data-item is-center"><a href="/zaker/archives/"><div class="headline">文章</div><div class="length-num">13</div></a></div><div class="card-info-data-item is-center"><a href="/zaker/tags/"><div class="headline">标签</div><div class="length-num">11</div></a></div><div class="card-info-data-item is-center"><a href="/zaker/categories/"><div class="headline">分类</div><div class="length-num">4</div></a></div></div><a class="button--animated" id="card-info-btn" href="http://zuori.gitee.io/zaker"><i class="fab fa-github"></i><span>Follow Me</span></a></div><div class="card-widget card-announcement"><div class="item-headline"><i class="fas fa-bullhorn card-announcement-animation"></i><span>公告</span></div><div class="announcement_content">记录一点东西的地方</div></div><div class="sticky_layout"><div class="card-widget" id="card-toc"><div class="item-headline"><i class="fas fa-stream"></i><span>目录</span></div><div class="toc-content"><ol class="toc"><li class="toc-item toc-level-1"><a class="toc-link"><span class="toc-number">1.</span> <span class="toc-text">RabbitMq学习笔记</span></a><ol class="toc-child"><li class="toc-item toc-level-2"><a class="toc-link" href="#RabbitMQ%E7%AE%80%E4%BB%8B"><span class="toc-number">1.1.</span> <span class="toc-text">RabbitMQ简介</span></a><ol class="toc-child"><li class="toc-item toc-level-3"><a class="toc-link" href="#%E4%B8%BA%E4%BB%80%E4%B9%88%E8%A6%81%E4%BD%BF%E7%94%A8RabbitMQ"><span class="toc-number">1.1.1.</span> <span class="toc-text">为什么要使用RabbitMQ</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#RabbitMQ-%E5%BA%94%E7%94%A8%E5%9C%BA%E6%99%AF"><span class="toc-number">1.1.2.</span> <span class="toc-text">RabbitMQ 应用场景</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#MQ-%E7%9A%84%E7%A9%BA%E9%97%B4%E4%B8%8E%E6%97%B6%E9%97%B4%E8%A7%A3%E8%80%A6"><span class="toc-number">1.1.3.</span> <span class="toc-text">MQ 的空间与时间解耦</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#%E4%B8%BB%E6%B5%81%E6%B6%88%E6%81%AF%E4%B8%AD%E9%97%B4%E4%BB%B6%E4%B8%80%E8%A7%88"><span class="toc-number">1.1.4.</span> <span class="toc-text">主流消息中间件一览</span></a></li></ol></li><li class="toc-item toc-level-2"><a class="toc-link" href="#RabbitMQ%E6%9E%B6%E6%9E%84"><span class="toc-number">1.2.</span> <span class="toc-text">RabbitMQ架构</span></a></li><li class="toc-item toc-level-2"><a class="toc-link" href="#RabbitMQ%E7%9A%84%E5%AE%89%E8%A3%85%E4%B8%8E%E4%BD%BF%E7%94%A8"><span class="toc-number">1.3.</span> <span class="toc-text">RabbitMQ的安装与使用</span></a><ol class="toc-child"><li class="toc-item toc-level-3"><a class="toc-link" href="#RPM%E6%96%B9%E5%BC%8F%E5%AE%89%E8%A3%85RabbitMQ"><span class="toc-number">1.3.1.</span> <span class="toc-text">RPM方式安装RabbitMQ</span></a><ol class="toc-child"><li class="toc-item toc-level-4"><a class="toc-link" href="#%E5%87%86%E5%A4%87%E6%9C%8D%E5%8A%A1%E5%99%A8%E7%8E%AF%E5%A2%83"><span class="toc-number">1.3.1.1.</span> <span class="toc-text">准备服务器环境</span></a></li><li class="toc-item toc-level-4"><a class="toc-link" href="#%E8%8E%B7%E5%8F%96%E5%AE%89%E8%A3%85%E5%8C%85"><span class="toc-number">1.3.1.2.</span> <span class="toc-text">获取安装包</span></a></li><li class="toc-item toc-level-4"><a class="toc-link" href="#%E5%BC%80%E5%A7%8B%E5%AE%89%E8%A3%85"><span class="toc-number">1.3.1.3.</span> <span class="toc-text">开始安装</span></a></li><li class="toc-item toc-level-4"><a class="toc-link" href="#%E8%BF%90%E8%A1%8C%E4%B8%8E%E5%90%AF%E5%8A%A8"><span class="toc-number">1.3.1.4.</span> <span class="toc-text">运行与启动</span></a></li><li class="toc-item toc-level-4"><a class="toc-link" href="#%E7%AE%A1%E7%90%86%E7%95%8C%E9%9D%A2%E5%B8%B8%E8%A7%81%E6%93%8D%E4%BD%9C"><span class="toc-number">1.3.1.5.</span> <span class="toc-text">管理界面常见操作</span></a><ol class="toc-child"><li class="toc-item toc-level-5"><a class="toc-link" href="#%E9%85%8D%E7%BD%AEVirtual-hosts"><span class="toc-number">1.3.1.5.1.</span> <span class="toc-text">配置Virtual hosts</span></a></li><li class="toc-item toc-level-5"><a class="toc-link" href="#%E5%88%9B%E5%BB%BA%E7%94%A8%E6%88%B7"><span class="toc-number">1.3.1.5.2.</span> <span class="toc-text">创建用户</span></a></li></ol></li><li class="toc-item toc-level-4"><a class="toc-link" href="#%E6%93%8D%E4%BD%9C%E5%91%BD%E4%BB%A4"><span class="toc-number">1.3.1.6.</span> <span class="toc-text">操作命令</span></a></li></ol></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Docker%E5%AE%89%E8%A3%85RabbitMQ"><span class="toc-number">1.3.2.</span> <span class="toc-text">Docker安装RabbitMQ</span></a></li></ol></li><li class="toc-item toc-level-2"><a class="toc-link" href="#RabbitMQ%E7%9A%84%E6%A0%B8%E5%BF%83%E6%A6%82%E5%BF%B5"><span class="toc-number">1.4.</span> <span class="toc-text">RabbitMQ的核心概念</span></a><ol class="toc-child"><li class="toc-item toc-level-3"><a class="toc-link" href="#Broker"><span class="toc-number">1.4.1.</span> <span class="toc-text">Broker</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Connection"><span class="toc-number">1.4.2.</span> <span class="toc-text">Connection</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Message"><span class="toc-number">1.4.3.</span> <span class="toc-text">Message</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Channel"><span class="toc-number">1.4.4.</span> <span class="toc-text">Channel</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Publisher"><span class="toc-number">1.4.5.</span> <span class="toc-text">Publisher</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Exchange%EF%BC%88%E5%B0%86%E6%B6%88%E6%81%AF%E8%B7%AF%E7%94%B1%E7%BB%99%E9%98%9F%E5%88%97-%EF%BC%89"><span class="toc-number">1.4.6.</span> <span class="toc-text">Exchange（将消息路由给队列 ）</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Binding%EF%BC%88%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E5%92%8C%E4%BA%A4%E6%8D%A2%E5%99%A8%E4%B9%8B%E9%97%B4%E7%9A%84%E5%85%B3%E8%81%94%EF%BC%89"><span class="toc-number">1.4.7.</span> <span class="toc-text">Binding（消息队列和交换器之间的关联）</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Queue"><span class="toc-number">1.4.8.</span> <span class="toc-text">Queue</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Consumer"><span class="toc-number">1.4.9.</span> <span class="toc-text">Consumer</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Virtual-Host"><span class="toc-number">1.4.10.</span> <span class="toc-text">Virtual Host</span></a></li></ol></li><li class="toc-item toc-level-2"><a class="toc-link" href="#%E4%BA%A4%E6%8D%A2%E6%9C%BA%E7%9A%84%E6%A8%A1%E5%BC%8F"><span class="toc-number">1.5.</span> <span class="toc-text">交换机的模式</span></a><ol class="toc-child"><li class="toc-item toc-level-3"><a class="toc-link" href="#fanout"><span class="toc-number">1.5.1.</span> <span class="toc-text">fanout</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Direct"><span class="toc-number">1.5.2.</span> <span class="toc-text">Direct</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#Topic"><span class="toc-number">1.5.3.</span> <span class="toc-text">Topic</span></a></li></ol></li><li class="toc-item toc-level-2"><a class="toc-link" href="#%E6%9E%84%E5%BB%BA%E4%B8%80%E4%B8%AA%E7%AE%80%E5%8D%95%E7%9A%84%E7%94%9F%E4%BA%A7%E8%80%85%E4%B8%8E%E6%B6%88%E8%B4%B9%E8%80%85%E6%A8%A1%E5%9E%8B"><span class="toc-number">1.6.</span> <span class="toc-text">构建一个简单的生产者与消费者模型</span></a><ol class="toc-child"><li class="toc-item toc-level-3"><a class="toc-link" href="#%E6%9E%84%E5%BB%BA%E7%94%9F%E4%BA%A7%E8%80%85%E4%B8%8E%E6%B6%88%E8%B4%B9%E8%80%85%E6%AD%A5%E9%AA%A4"><span class="toc-number">1.6.1.</span> <span class="toc-text">构建生产者与消费者步骤</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#%E7%94%9F%E4%BA%A7%E8%80%85%E5%AE%9E%E4%BE%8B"><span class="toc-number">1.6.2.</span> <span class="toc-text">生产者实例</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#%E6%B6%88%E8%B4%B9%E8%80%85%E5%AE%9E%E4%BE%8B"><span class="toc-number">1.6.3.</span> <span class="toc-text">消费者实例</span></a></li></ol></li><li class="toc-item toc-level-2"><a class="toc-link" href="#SpringBoot%E6%95%B4%E5%90%88RabbitMQ"><span class="toc-number">1.7.</span> <span class="toc-text">SpringBoot整合RabbitMQ</span></a><ol class="toc-child"><li class="toc-item toc-level-3"><a class="toc-link" href="#rabbitmq-producer%EF%BC%88%E7%94%9F%E4%BA%A7%E8%80%85%EF%BC%89"><span class="toc-number">1.7.1.</span> <span class="toc-text">rabbitmq-producer（生产者）</span></a></li><li class="toc-item toc-level-3"><a class="toc-link" href="#rabbitmq-consumer-%E6%B6%88%E8%B4%B9%E8%80%85%EF%BC%89"><span class="toc-number">1.7.2.</span> <span class="toc-text">rabbitmq-consumer(消费者）</span></a></li></ol></li></ol></li></ol></div></div><div class="card-widget card-recent-post"><div class="item-headline"><i class="fas fa-history"></i><span>最新文章</span></div><div class="aside-list"><div class="aside-list-item"><a class="thumbnail" href="/zaker/2021/03/30/Kubernetes%E9%9B%86%E7%BE%A4%E7%9B%91%E6%8E%A7%E4%B9%8B%20Metrics%20Server/" title="Kubernetes集群监控之 Metrics Server"><img src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg" onerror="this.onerror=null;this.src='/zaker/img/404.jpg'" alt="Kubernetes集群监控之 Metrics Server"/></a><div class="content"><a class="title" href="/zaker/2021/03/30/Kubernetes%E9%9B%86%E7%BE%A4%E7%9B%91%E6%8E%A7%E4%B9%8B%20Metrics%20Server/" title="Kubernetes集群监控之 Metrics Server">Kubernetes集群监控之 Metrics Server</a><time datetime="2021-03-30T06:46:36.000Z" title="发表于 2021-03-30 14:46:36">2021-03-30</time></div></div><div class="aside-list-item"><a class="thumbnail" href="/zaker/2021/03/26/RabbitMq%E5%9F%BA%E7%A1%80%E4%BD%BF%E7%94%A8/" title="RabbitMq学习笔记"><img src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg" onerror="this.onerror=null;this.src='/zaker/img/404.jpg'" alt="RabbitMq学习笔记"/></a><div class="content"><a class="title" href="/zaker/2021/03/26/RabbitMq%E5%9F%BA%E7%A1%80%E4%BD%BF%E7%94%A8/" title="RabbitMq学习笔记">RabbitMq学习笔记</a><time datetime="2021-03-26T01:46:36.000Z" title="发表于 2021-03-26 09:46:36">2021-03-26</time></div></div><div class="aside-list-item"><a class="thumbnail" href="/zaker/2021/03/25/K8s%E5%8F%AF%E8%A7%86%E5%8C%96%E7%9B%91%E6%8E%A7%E4%B9%8B-Weave%20Scope/" title="Kubernetes集群可视化监控之-Weave Scope"><img src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg" onerror="this.onerror=null;this.src='/zaker/img/404.jpg'" alt="Kubernetes集群可视化监控之-Weave Scope"/></a><div class="content"><a class="title" href="/zaker/2021/03/25/K8s%E5%8F%AF%E8%A7%86%E5%8C%96%E7%9B%91%E6%8E%A7%E4%B9%8B-Weave%20Scope/" title="Kubernetes集群可视化监控之-Weave Scope">Kubernetes集群可视化监控之-Weave Scope</a><time datetime="2021-03-25T11:46:36.000Z" title="发表于 2021-03-25 19:46:36">2021-03-25</time></div></div><div class="aside-list-item"><a class="thumbnail" href="/zaker/2021/03/25/Linux%E5%B0%86%E6%99%AE%E9%80%9A%E7%94%A8%E6%88%B7%E5%8A%A0%E5%85%A5sudo%E7%BB%84/" title="Linux将一般用户加入sudo组"><img src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg" onerror="this.onerror=null;this.src='/zaker/img/404.jpg'" alt="Linux将一般用户加入sudo组"/></a><div class="content"><a class="title" href="/zaker/2021/03/25/Linux%E5%B0%86%E6%99%AE%E9%80%9A%E7%94%A8%E6%88%B7%E5%8A%A0%E5%85%A5sudo%E7%BB%84/" title="Linux将一般用户加入sudo组">Linux将一般用户加入sudo组</a><time datetime="2021-03-25T11:46:36.000Z" title="发表于 2021-03-25 19:46:36">2021-03-25</time></div></div><div class="aside-list-item"><a class="thumbnail" href="/zaker/2021/03/03/Kubectl%E5%9F%BA%E6%9C%AC%E4%BD%BF%E7%94%A8/" title="kubectl 基本使用"><img src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/img/default.jpg" onerror="this.onerror=null;this.src='/zaker/img/404.jpg'" alt="kubectl 基本使用"/></a><div class="content"><a class="title" href="/zaker/2021/03/03/Kubectl%E5%9F%BA%E6%9C%AC%E4%BD%BF%E7%94%A8/" title="kubectl 基本使用">kubectl 基本使用</a><time datetime="2021-03-03T11:46:36.000Z" title="发表于 2021-03-03 19:46:36">2021-03-03</time></div></div></div></div></div></div></main><footer id="footer"><div id="footer-wrap"><div class="copyright">&copy;2020 - 2021 By 张十三</div><div class="framework-info"><span>框架 </span><a target="_blank" rel="noopener" href="https://hexo.io">Hexo</a><span class="footer-separator">|</span><span>主题 </span><a target="_blank" rel="noopener" href="https://github.com/jerryc127/hexo-theme-butterfly">Butterfly</a></div></div></footer></div><div id="rightside"><div id="rightside-config-hide"><button id="readmode" type="button" title="阅读模式"><i class="fas fa-book-open"></i></button><button id="darkmode" type="button" title="浅色和深色模式转换"><i class="fas fa-adjust"></i></button><button id="hide-aside-btn" type="button" title="单栏和双栏切换"><i class="fas fa-arrows-alt-h"></i></button></div><div id="rightside-config-show"><button id="rightside_config" type="button" title="设置"><i class="fas fa-cog fa-spin"></i></button><button class="close" id="mobile-toc-button" type="button" title="目录"><i class="fas fa-list-ul"></i></button><button id="go-up" type="button" title="回到顶部"><i class="fas fa-arrow-up"></i></button></div></div><div><script src="/zaker/js/utils.js"></script><script src="/zaker/js/main.js"></script><script>var preloader = {
  endLoading: () => {
    document.body.style.overflow = 'auto';
    document.getElementById('loading-box').classList.add("loaded")
  },
  initLoading: () => {
    document.body.style.overflow = '';
    document.getElementById('loading-box').classList.remove("loaded")

  }
}
window.addEventListener('load',preloader.endLoading())</script><div class="js-pjax"></div><script defer="defer" id="ribbon" src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/dist/canvas-ribbon.min.js" size="150" alpha="0.6" zIndex="-1" mobile="false" data-click="false"></script><script defer="defer" id="fluttering_ribbon" mobile="true" src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/dist/canvas-fluttering-ribbon.min.js"></script><script id="canvas_nest" defer="defer" color="0,0,255" opacity="0.7" zIndex="-1" count="99" mobile="false" src="https://cdn.jsdelivr.net/npm/butterfly-extsrc@1/dist/canvas-nest.min.js"></script><script async data-pjax src="//busuanzi.ibruce.info/busuanzi/2.3/busuanzi.pure.mini.js"></script></div></body></html>